7777import rx .util .OnErrorNotImplementedException ;
7878import rx .util .Range ;
7979import rx .util .Timestamped ;
80+ import rx .util .functions .Action ;
8081import rx .util .functions .Action0 ;
8182import rx .util .functions .Action1 ;
8283import rx .util .functions .Func0 ;
@@ -249,56 +250,6 @@ private Subscription protectivelyWrapAndSubscribe(Observer<T> o) {
249250 return subscription .wrap (subscribe (new SafeObserver <T >(subscription , o )));
250251 }
251252
252- @ SuppressWarnings ({ "rawtypes" , "unchecked" })
253- public Subscription subscribe (final Map <String , Object > callbacks ) {
254- if (callbacks == null ) {
255- throw new RuntimeException ("callbacks map can not be null" );
256- }
257- Object _onNext = callbacks .get ("onNext" );
258- if (_onNext == null ) {
259- throw new RuntimeException ("'onNext' key must contain an implementation" );
260- }
261- // lookup and memoize onNext
262- final FuncN onNext = Functions .from (_onNext );
263-
264- /**
265- * Wrapping since raw functions provided by the user are being invoked.
266- *
267- * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
268- */
269- return protectivelyWrapAndSubscribe (new Observer () {
270-
271- @ Override
272- public void onCompleted () {
273- Object onComplete = callbacks .get ("onCompleted" );
274- if (onComplete != null ) {
275- Functions .from (onComplete ).call ();
276- }
277- }
278-
279- @ Override
280- public void onError (Throwable e ) {
281- handleError (e );
282- Object onError = callbacks .get ("onError" );
283- if (onError != null ) {
284- Functions .from (onError ).call (e );
285- } else {
286- throw new OnErrorNotImplementedException (e );
287- }
288- }
289-
290- @ Override
291- public void onNext (Object args ) {
292- onNext .call (args );
293- }
294-
295- });
296- }
297-
298- public Subscription subscribe (final Map <String , Object > callbacks , Scheduler scheduler ) {
299- return subscribeOn (scheduler ).subscribe (callbacks );
300- }
301-
302253 public Subscription subscribe (final Action1 <T > onNext ) {
303254 if (onNext == null ) {
304255 throw new IllegalArgumentException ("onNext can not be null" );
@@ -1086,13 +1037,13 @@ public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<T0> w0, Observabl
10861037 * each time an event is received from one of the source observables, where the aggregation is defined by the given function.
10871038 * <p>
10881039 * <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/combineLatest.png">
1089- *
1040+ *
10901041 * @param w0
1091- * The first source observable.
1042+ * The first source observable.
10921043 * @param w1
1093- * The second source observable.
1044+ * The second source observable.
10941045 * @param combineFunction
1095- * The aggregation function used to combine the source observable values.
1046+ * The aggregation function used to combine the source observable values.
10961047 * @return An Observable that combines the source Observables with the given combine function
10971048 */
10981049 public static <R , T0 , T1 > Observable <R > combineLatest (Observable <T0 > w0 , Observable <T1 > w1 , Func2 <T0 , T1 , R > combineFunction ) {
@@ -1112,7 +1063,7 @@ public static <R, T0, T1, T2> Observable<R> combineLatest(Observable<T0> w0, Obs
11121063 public static <R , T0 , T1 , T2 , T3 > Observable <R > combineLatest (Observable <T0 > w0 , Observable <T1 > w1 , Observable <T2 > w2 , Observable <T3 > w3 , Func4 <T0 , T1 , T2 , T3 , R > combineFunction ) {
11131064 return create (OperationCombineLatest .combineLatest (w0 , w1 , w2 , w3 , combineFunction ));
11141065 }
1115-
1066+
11161067 /**
11171068 * Creates an Observable which produces buffers of collected values.
11181069 *
0 commit comments