1919import java .util .concurrent .ConcurrentLinkedQueue ;
2020import java .util .concurrent .atomic .AtomicLong ;
2121
22+ import rx .*;
2223import rx .Observable .Operator ;
23- import rx .Producer ;
24- import rx .Subscriber ;
25- import rx .Subscription ;
26- import rx .exceptions .MissingBackpressureException ;
27- import rx .exceptions .OnErrorThrowable ;
28- import rx .functions .Func0 ;
29- import rx .functions .Func1 ;
30- import rx .internal .util .unsafe .SpscArrayQueue ;
31- import rx .internal .util .unsafe .UnsafeAccess ;
24+ import rx .exceptions .*;
25+ import rx .functions .*;
26+ import rx .internal .producers .ProducerArbiter ;
27+ import rx .internal .util .unsafe .*;
3228
3329/**
3430 * Applies a function of your choosing to every item emitted by an {@code Observable}, and emits the results of
@@ -50,44 +46,60 @@ public OperatorMapNotification(Func1<? super T, ? extends R> onNext, Func1<? sup
5046
5147 @ Override
5248 public Subscriber <? super T > call (final Subscriber <? super R > o ) {
53- Subscriber <T > subscriber = new Subscriber <T >() {
54- SingleEmitter <R > emitter ;
55- @ Override
56- public void setProducer (Producer producer ) {
57- emitter = new SingleEmitter <R >(o , producer , this );
58- o .setProducer (emitter );
59- }
60-
61- @ Override
62- public void onCompleted () {
63- try {
64- emitter .offerAndComplete (onCompleted .call ());
65- } catch (Throwable e ) {
66- o .onError (e );
67- }
68- }
49+ final ProducerArbiter pa = new ProducerArbiter ();
50+
51+ MapNotificationSubscriber subscriber = new MapNotificationSubscriber (pa , o );
52+ o .add (subscriber );
53+ subscriber .init ();
54+ return subscriber ;
55+ }
56+
57+ final class MapNotificationSubscriber extends Subscriber <T > {
58+ private final Subscriber <? super R > o ;
59+ private final ProducerArbiter pa ;
60+ final SingleEmitter <R > emitter ;
61+
62+ private MapNotificationSubscriber (ProducerArbiter pa , Subscriber <? super R > o ) {
63+ this .pa = pa ;
64+ this .o = o ;
65+ this .emitter = new SingleEmitter <R >(o , pa , this );
66+ }
67+
68+ void init () {
69+ o .setProducer (emitter );
70+ }
6971
70- @ Override
71- public void onError (Throwable e ) {
72- try {
73- emitter .offerAndComplete (onError .call (e ));
74- } catch (Throwable e2 ) {
75- o .onError (e );
76- }
72+ @ Override
73+ public void setProducer (Producer producer ) {
74+ pa .setProducer (producer );
75+ }
76+
77+ @ Override
78+ public void onCompleted () {
79+ try {
80+ emitter .offerAndComplete (onCompleted .call ());
81+ } catch (Throwable e ) {
82+ o .onError (e );
7783 }
84+ }
7885
79- @ Override
80- public void onNext (T t ) {
81- try {
82- emitter .offer (onNext .call (t ));
83- } catch (Throwable e ) {
84- o .onError (OnErrorThrowable .addValueAsLastCause (e , t ));
85- }
86+ @ Override
87+ public void onError (Throwable e ) {
88+ try {
89+ emitter .offerAndComplete (onError .call (e ));
90+ } catch (Throwable e2 ) {
91+ o .onError (e );
8692 }
93+ }
8794
88- };
89- o .add (subscriber );
90- return subscriber ;
95+ @ Override
96+ public void onNext (T t ) {
97+ try {
98+ emitter .offer (onNext .call (t ));
99+ } catch (Throwable e ) {
100+ o .onError (OnErrorThrowable .addValueAsLastCause (e , t ));
101+ }
102+ }
91103 }
92104 static final class SingleEmitter <T > extends AtomicLong implements Producer , Subscription {
93105 /** */
0 commit comments