1616package rx .internal .operators ;
1717
1818import java .util .Iterator ;
19+ import java .util .NoSuchElementException ;
1920
2021import rx .Observable ;
2122import rx .Subscriber ;
@@ -41,54 +42,24 @@ public final class BlockingOperatorMostRecent {
4142 * {@code initialValue} if {@code source} has not yet emitted any items
4243 */
4344 public static <T > Iterable <T > mostRecent (final Observable <? extends T > source , final T initialValue ) {
44-
4545 return new Iterable <T >() {
4646 @ Override
4747 public Iterator <T > iterator () {
4848 MostRecentObserver <T > mostRecentObserver = new MostRecentObserver <T >(initialValue );
49- final MostRecentIterator <T > nextIterator = new MostRecentIterator <T >(mostRecentObserver );
5049
5150 /**
5251 * Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain
5352 * since it is for BlockingObservable.
5453 */
5554 source .subscribe (mostRecentObserver );
5655
57- return nextIterator ;
56+ return mostRecentObserver . getIterable () ;
5857 }
5958 };
60-
61- }
62-
63- private static class MostRecentIterator <T > implements Iterator <T > {
64-
65- private final MostRecentObserver <T > observer ;
66-
67- private MostRecentIterator (MostRecentObserver <T > observer ) {
68- this .observer = observer ;
69- }
70-
71- @ Override
72- public boolean hasNext () {
73- return !observer .isCompleted ();
74- }
75-
76- @ Override
77- public T next () {
78- if (observer .getThrowable () != null ) {
79- throw Exceptions .propagate (observer .getThrowable ());
80- }
81- return observer .getRecentValue ();
82- }
83-
84- @ Override
85- public void remove () {
86- throw new UnsupportedOperationException ("Read only iterator" );
87- }
8859 }
8960
9061 private static class MostRecentObserver <T > extends Subscriber <T > {
91- static final NotificationLite <Object > nl = NotificationLite .instance ();
62+ final NotificationLite <T > nl = NotificationLite .instance ();
9263 volatile Object value ;
9364
9465 private MostRecentObserver (T value ) {
@@ -110,19 +81,47 @@ public void onNext(T args) {
11081 value = nl .next (args );
11182 }
11283
113- private boolean isCompleted () {
114- return nl .isCompleted (value );
115- }
116-
117- private Throwable getThrowable () {
118- Object v = value ;
119- return nl .isError (v ) ? nl .getError (v ) : null ;
120- }
121-
122- @ SuppressWarnings ("unchecked" )
123- private T getRecentValue () {
124- return (T )value ;
84+ /**
85+ * The {@link Iterator} return is not thread safe. In other words don't call {@link Iterator#hasNext()} in one
86+ * thread expect {@link Iterator#next()} called from a different thread to work.
87+ * @return
88+ */
89+ public Iterator <T > getIterable () {
90+ return new Iterator <T >() {
91+ /**
92+ * buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next().
93+ */
94+ private Object buf = null ;
95+
96+ @ Override
97+ public boolean hasNext () {
98+ buf = value ;
99+ return !nl .isCompleted (buf );
100+ }
101+
102+ @ Override
103+ public T next () {
104+ try {
105+ // if hasNext wasn't called before calling next.
106+ if (buf == null )
107+ buf = value ;
108+ if (nl .isCompleted (buf ))
109+ throw new NoSuchElementException ();
110+ if (nl .isError (buf )) {
111+ throw Exceptions .propagate (nl .getError (buf ));
112+ }
113+ return nl .getValue (buf );
114+ }
115+ finally {
116+ buf = null ;
117+ }
118+ }
119+
120+ @ Override
121+ public void remove () {
122+ throw new UnsupportedOperationException ("Read only iterator" );
123+ }
124+ };
125125 }
126-
127126 }
128127}
0 commit comments