@@ -118,7 +118,7 @@ public static <T> ParallelFlowable<T> from(Publisher<? extends T> source,
118118 ObjectHelper .verifyPositive (parallelism , "parallelism" );
119119 ObjectHelper .verifyPositive (prefetch , "prefetch" );
120120
121- return new ParallelFromPublisher <T >(source , parallelism , prefetch );
121+ return RxJavaPlugins . onAssembly ( new ParallelFromPublisher <T >(source , parallelism , prefetch ) );
122122 }
123123
124124 /**
@@ -132,7 +132,7 @@ public static <T> ParallelFlowable<T> from(Publisher<? extends T> source,
132132 @ CheckReturnValue
133133 public final <R > ParallelFlowable <R > map (Function <? super T , ? extends R > mapper ) {
134134 ObjectHelper .requireNonNull (mapper , "mapper" );
135- return new ParallelMap <T , R >(this , mapper );
135+ return RxJavaPlugins . onAssembly ( new ParallelMap <T , R >(this , mapper ) );
136136 }
137137
138138 /**
@@ -145,7 +145,7 @@ public final <R> ParallelFlowable<R> map(Function<? super T, ? extends R> mapper
145145 @ CheckReturnValue
146146 public final ParallelFlowable <T > filter (Predicate <? super T > predicate ) {
147147 ObjectHelper .requireNonNull (predicate , "predicate" );
148- return new ParallelFilter <T >(this , predicate );
148+ return RxJavaPlugins . onAssembly ( new ParallelFilter <T >(this , predicate ) );
149149 }
150150
151151 /**
@@ -197,7 +197,7 @@ public final ParallelFlowable<T> runOn(Scheduler scheduler) {
197197 public final ParallelFlowable <T > runOn (Scheduler scheduler , int prefetch ) {
198198 ObjectHelper .requireNonNull (scheduler , "scheduler" );
199199 ObjectHelper .verifyPositive (prefetch , "prefetch" );
200- return new ParallelRunOn <T >(this , scheduler , prefetch );
200+ return RxJavaPlugins . onAssembly ( new ParallelRunOn <T >(this , scheduler , prefetch ) );
201201 }
202202
203203 /**
@@ -229,7 +229,7 @@ public final Flowable<T> reduce(BiFunction<T, T, T> reducer) {
229229 public final <R > ParallelFlowable <R > reduce (Callable <R > initialSupplier , BiFunction <R , ? super T , R > reducer ) {
230230 ObjectHelper .requireNonNull (initialSupplier , "initialSupplier" );
231231 ObjectHelper .requireNonNull (reducer , "reducer" );
232- return new ParallelReduce <T , R >(this , initialSupplier , reducer );
232+ return RxJavaPlugins . onAssembly ( new ParallelReduce <T , R >(this , initialSupplier , reducer ) );
233233 }
234234
235235 /**
@@ -304,6 +304,8 @@ public final Flowable<T> sorted(Comparator<? super T> comparator) {
304304 */
305305 @ CheckReturnValue
306306 public final Flowable <T > sorted (Comparator <? super T > comparator , int capacityHint ) {
307+ ObjectHelper .requireNonNull (comparator , "comparator is null" );
308+ ObjectHelper .verifyPositive (capacityHint , "capacityHint" );
307309 int ch = capacityHint / parallelism () + 1 ;
308310 ParallelFlowable <List <T >> railReduced = reduce (Functions .<T >createArrayList (ch ), ListAddBiConsumer .<T >instance ());
309311 ParallelFlowable <List <T >> railSorted = railReduced .map (new SorterFunction <T >(comparator ));
@@ -334,6 +336,9 @@ public final Flowable<List<T>> toSortedList(Comparator<? super T> comparator) {
334336 */
335337 @ CheckReturnValue
336338 public final Flowable <List <T >> toSortedList (Comparator <? super T > comparator , int capacityHint ) {
339+ ObjectHelper .requireNonNull (comparator , "comparator is null" );
340+ ObjectHelper .verifyPositive (capacityHint , "capacityHint" );
341+
337342 int ch = capacityHint / parallelism () + 1 ;
338343 ParallelFlowable <List <T >> railReduced = reduce (Functions .<T >createArrayList (ch ), ListAddBiConsumer .<T >instance ());
339344 ParallelFlowable <List <T >> railSorted = railReduced .map (new SorterFunction <T >(comparator ));
@@ -351,7 +356,8 @@ public final Flowable<List<T>> toSortedList(Comparator<? super T> comparator, in
351356 */
352357 @ CheckReturnValue
353358 public final ParallelFlowable <T > doOnNext (Consumer <? super T > onNext ) {
354- return new ParallelPeek <T >(this ,
359+ ObjectHelper .requireNonNull (onNext , "onNext is null" );
360+ return RxJavaPlugins .onAssembly (new ParallelPeek <T >(this ,
355361 onNext ,
356362 Functions .emptyConsumer (),
357363 Functions .emptyConsumer (),
@@ -360,7 +366,7 @@ public final ParallelFlowable<T> doOnNext(Consumer<? super T> onNext) {
360366 Functions .emptyConsumer (),
361367 Functions .EMPTY_LONG_CONSUMER ,
362368 Functions .EMPTY_ACTION
363- );
369+ )) ;
364370 }
365371
366372 /**
@@ -372,7 +378,8 @@ public final ParallelFlowable<T> doOnNext(Consumer<? super T> onNext) {
372378 */
373379 @ CheckReturnValue
374380 public final ParallelFlowable <T > doAfterNext (Consumer <? super T > onAfterNext ) {
375- return new ParallelPeek <T >(this ,
381+ ObjectHelper .requireNonNull (onAfterNext , "onAfterNext is null" );
382+ return RxJavaPlugins .onAssembly (new ParallelPeek <T >(this ,
376383 Functions .emptyConsumer (),
377384 onAfterNext ,
378385 Functions .emptyConsumer (),
@@ -381,7 +388,7 @@ public final ParallelFlowable<T> doAfterNext(Consumer<? super T> onAfterNext) {
381388 Functions .emptyConsumer (),
382389 Functions .EMPTY_LONG_CONSUMER ,
383390 Functions .EMPTY_ACTION
384- );
391+ )) ;
385392 }
386393
387394 /**
@@ -392,7 +399,8 @@ public final ParallelFlowable<T> doAfterNext(Consumer<? super T> onAfterNext) {
392399 */
393400 @ CheckReturnValue
394401 public final ParallelFlowable <T > doOnError (Consumer <Throwable > onError ) {
395- return new ParallelPeek <T >(this ,
402+ ObjectHelper .requireNonNull (onError , "onError is null" );
403+ return RxJavaPlugins .onAssembly (new ParallelPeek <T >(this ,
396404 Functions .emptyConsumer (),
397405 Functions .emptyConsumer (),
398406 onError ,
@@ -401,7 +409,7 @@ public final ParallelFlowable<T> doOnError(Consumer<Throwable> onError) {
401409 Functions .emptyConsumer (),
402410 Functions .EMPTY_LONG_CONSUMER ,
403411 Functions .EMPTY_ACTION
404- );
412+ )) ;
405413 }
406414
407415 /**
@@ -412,7 +420,8 @@ public final ParallelFlowable<T> doOnError(Consumer<Throwable> onError) {
412420 */
413421 @ CheckReturnValue
414422 public final ParallelFlowable <T > doOnComplete (Action onComplete ) {
415- return new ParallelPeek <T >(this ,
423+ ObjectHelper .requireNonNull (onComplete , "onComplete is null" );
424+ return RxJavaPlugins .onAssembly (new ParallelPeek <T >(this ,
416425 Functions .emptyConsumer (),
417426 Functions .emptyConsumer (),
418427 Functions .emptyConsumer (),
@@ -421,7 +430,7 @@ public final ParallelFlowable<T> doOnComplete(Action onComplete) {
421430 Functions .emptyConsumer (),
422431 Functions .EMPTY_LONG_CONSUMER ,
423432 Functions .EMPTY_ACTION
424- );
433+ )) ;
425434 }
426435
427436 /**
@@ -432,7 +441,8 @@ public final ParallelFlowable<T> doOnComplete(Action onComplete) {
432441 */
433442 @ CheckReturnValue
434443 public final ParallelFlowable <T > doAfterTerminated (Action onAfterTerminate ) {
435- return new ParallelPeek <T >(this ,
444+ ObjectHelper .requireNonNull (onAfterTerminate , "onAfterTerminate is null" );
445+ return RxJavaPlugins .onAssembly (new ParallelPeek <T >(this ,
436446 Functions .emptyConsumer (),
437447 Functions .emptyConsumer (),
438448 Functions .emptyConsumer (),
@@ -441,7 +451,7 @@ public final ParallelFlowable<T> doAfterTerminated(Action onAfterTerminate) {
441451 Functions .emptyConsumer (),
442452 Functions .EMPTY_LONG_CONSUMER ,
443453 Functions .EMPTY_ACTION
444- );
454+ )) ;
445455 }
446456
447457 /**
@@ -452,7 +462,8 @@ public final ParallelFlowable<T> doAfterTerminated(Action onAfterTerminate) {
452462 */
453463 @ CheckReturnValue
454464 public final ParallelFlowable <T > doOnSubscribe (Consumer <? super Subscription > onSubscribe ) {
455- return new ParallelPeek <T >(this ,
465+ ObjectHelper .requireNonNull (onSubscribe , "onSubscribe is null" );
466+ return RxJavaPlugins .onAssembly (new ParallelPeek <T >(this ,
456467 Functions .emptyConsumer (),
457468 Functions .emptyConsumer (),
458469 Functions .emptyConsumer (),
@@ -461,7 +472,7 @@ public final ParallelFlowable<T> doOnSubscribe(Consumer<? super Subscription> on
461472 onSubscribe ,
462473 Functions .EMPTY_LONG_CONSUMER ,
463474 Functions .EMPTY_ACTION
464- );
475+ )) ;
465476 }
466477
467478 /**
@@ -472,7 +483,8 @@ public final ParallelFlowable<T> doOnSubscribe(Consumer<? super Subscription> on
472483 */
473484 @ CheckReturnValue
474485 public final ParallelFlowable <T > doOnRequest (LongConsumer onRequest ) {
475- return new ParallelPeek <T >(this ,
486+ ObjectHelper .requireNonNull (onRequest , "onRequest is null" );
487+ return RxJavaPlugins .onAssembly (new ParallelPeek <T >(this ,
476488 Functions .emptyConsumer (),
477489 Functions .emptyConsumer (),
478490 Functions .emptyConsumer (),
@@ -481,7 +493,7 @@ public final ParallelFlowable<T> doOnRequest(LongConsumer onRequest) {
481493 Functions .emptyConsumer (),
482494 onRequest ,
483495 Functions .EMPTY_ACTION
484- );
496+ )) ;
485497 }
486498
487499 /**
@@ -492,7 +504,8 @@ public final ParallelFlowable<T> doOnRequest(LongConsumer onRequest) {
492504 */
493505 @ CheckReturnValue
494506 public final ParallelFlowable <T > doOnCancel (Action onCancel ) {
495- return new ParallelPeek <T >(this ,
507+ ObjectHelper .requireNonNull (onCancel , "onCancel is null" );
508+ return RxJavaPlugins .onAssembly (new ParallelPeek <T >(this ,
496509 Functions .emptyConsumer (),
497510 Functions .emptyConsumer (),
498511 Functions .emptyConsumer (),
@@ -501,7 +514,7 @@ public final ParallelFlowable<T> doOnCancel(Action onCancel) {
501514 Functions .emptyConsumer (),
502515 Functions .EMPTY_LONG_CONSUMER ,
503516 onCancel
504- );
517+ )) ;
505518 }
506519
507520 /**
@@ -515,7 +528,9 @@ public final ParallelFlowable<T> doOnCancel(Action onCancel) {
515528 */
516529 @ CheckReturnValue
517530 public final <C > ParallelFlowable <C > collect (Callable <? extends C > collectionSupplier , BiConsumer <? super C , ? super T > collector ) {
518- return new ParallelCollect <T , C >(this , collectionSupplier , collector );
531+ ObjectHelper .requireNonNull (collectionSupplier , "collectionSupplier is null" );
532+ ObjectHelper .requireNonNull (collector , "collector is null" );
533+ return RxJavaPlugins .onAssembly (new ParallelCollect <T , C >(this , collectionSupplier , collector ));
519534 }
520535
521536 /**
@@ -531,7 +546,7 @@ public static <T> ParallelFlowable<T> fromArray(Publisher<T>... publishers) {
531546 if (publishers .length == 0 ) {
532547 throw new IllegalArgumentException ("Zero publishers not supported" );
533548 }
534- return new ParallelFromArray <T >(publishers );
549+ return RxJavaPlugins . onAssembly ( new ParallelFromArray <T >(publishers ) );
535550 }
536551
537552 /**
@@ -562,7 +577,7 @@ public final <U> U to(Function<? super ParallelFlowable<T>, U> converter) {
562577 */
563578 @ CheckReturnValue
564579 public final <U > ParallelFlowable <U > compose (Function <? super ParallelFlowable <T >, ParallelFlowable <U >> composer ) {
565- return to (composer );
580+ return RxJavaPlugins . onAssembly ( to (composer ) );
566581 }
567582
568583 /**
@@ -629,7 +644,10 @@ public final <R> ParallelFlowable<R> flatMap(
629644 public final <R > ParallelFlowable <R > flatMap (
630645 Function <? super T , ? extends Publisher <? extends R >> mapper ,
631646 boolean delayError , int maxConcurrency , int prefetch ) {
632- return new ParallelFlatMap <T , R >(this , mapper , delayError , maxConcurrency , prefetch );
647+ ObjectHelper .requireNonNull (mapper , "mapper is null" );
648+ ObjectHelper .verifyPositive (maxConcurrency , "maxConcurrency" );
649+ ObjectHelper .verifyPositive (prefetch , "prefetch" );
650+ return RxJavaPlugins .onAssembly (new ParallelFlatMap <T , R >(this , mapper , delayError , maxConcurrency , prefetch ));
633651 }
634652
635653 /**
@@ -661,7 +679,9 @@ public final <R> ParallelFlowable<R> concatMap(
661679 public final <R > ParallelFlowable <R > concatMap (
662680 Function <? super T , ? extends Publisher <? extends R >> mapper ,
663681 int prefetch ) {
664- return new ParallelConcatMap <T , R >(this , mapper , prefetch , ErrorMode .IMMEDIATE );
682+ ObjectHelper .requireNonNull (mapper , "mapper is null" );
683+ ObjectHelper .verifyPositive (prefetch , "prefetch" );
684+ return RxJavaPlugins .onAssembly (new ParallelConcatMap <T , R >(this , mapper , prefetch , ErrorMode .IMMEDIATE ));
665685 }
666686
667687 /**
@@ -697,6 +717,9 @@ public final <R> ParallelFlowable<R> concatMapDelayError(
697717 public final <R > ParallelFlowable <R > concatMapDelayError (
698718 Function <? super T , ? extends Publisher <? extends R >> mapper ,
699719 int prefetch , boolean tillTheEnd ) {
700- return new ParallelConcatMap <T , R >(this , mapper , prefetch , tillTheEnd ? ErrorMode .END : ErrorMode .BOUNDARY );
720+ ObjectHelper .requireNonNull (mapper , "mapper is null" );
721+ ObjectHelper .verifyPositive (prefetch , "prefetch" );
722+ return RxJavaPlugins .onAssembly (new ParallelConcatMap <T , R >(
723+ this , mapper , prefetch , tillTheEnd ? ErrorMode .END : ErrorMode .BOUNDARY ));
701724 }
702725}
0 commit comments