@@ -186,8 +186,9 @@ private void handleNewSource(Observable<? extends T> t) {
186186 InnerSubscriber <T > i = new InnerSubscriber <T >(this , producerIfNeeded );
187187 i .sindex = childrenSubscribers .add (i );
188188 t .unsafeSubscribe (i );
189- if (!isUnsubscribed ())
189+ if (!isUnsubscribed ()) {
190190 request (1 );
191+ }
191192 }
192193
193194 private void handleScalarSynchronousObservable (ScalarSynchronousObservable <? extends T > t ) {
@@ -382,19 +383,8 @@ private int drainScalarValueQueue() {
382383 public Boolean call (InnerSubscriber <T > s ) {
383384 if (s .q != null ) {
384385 long r = mergeProducer .requested ;
385- int emitted = 0 ;
386- emitted += s .drainQueue ();
386+ int emitted = s .drainQueue ();
387387 if (emitted > 0 ) {
388- /*
389- * `s.emitted` is not volatile (because of performance impact of making it so shown by JMH tests)
390- * but `emitted` can ONLY be touched by the thread holding the `emitLock` which we're currently inside.
391- *
392- * Entering and leaving the emitLock flushes all values so this is visible to us.
393- */
394- emitted += s .emitted ;
395- // TODO we may want to store this in s.emitted and only request if above batch
396- // reset this since we have requested them all
397- s .emitted = 0 ;
398388 s .requestMore (emitted );
399389 }
400390 if (emitted == r ) {
@@ -542,9 +532,6 @@ private static final class InnerSubscriber<T> extends Subscriber<T> {
542532 static final AtomicIntegerFieldUpdater <InnerSubscriber > ONCE_TERMINATED = AtomicIntegerFieldUpdater .newUpdater (InnerSubscriber .class , "terminated" );
543533
544534 private final RxRingBuffer q = RxRingBuffer .getSpmcInstance ();
545- /* protected by emitLock */
546- int emitted = 0 ;
547- final int THRESHOLD = (int ) (q .capacity () * 0.7 );
548535
549536 public InnerSubscriber (MergeSubscriber <T > parent , MergeProducer <T > producer ) {
550537 this .parentSubscriber = parent ;
@@ -618,6 +605,7 @@ private void emit(T t, boolean complete) {
618605 * putting in the queue, it attempts to get the lock. We are optimizing for the non-contended case.
619606 */
620607 if (parentSubscriber .getEmitLock ()) {
608+ long emitted = 0 ;
621609 enqueue = false ;
622610 try {
623611 // drain the queue if there is anything in it before emitting the current value
@@ -660,30 +648,9 @@ private void emit(T t, boolean complete) {
660648 } finally {
661649 drain = parentSubscriber .releaseEmitLock ();
662650 }
663- if (emitted > THRESHOLD ) {
664- // this is for batching requests when we're in a use case that isn't queueing, always fast-pathing the onNext
665- /**
666- * <pre> {@code
667- * Without this batching:
668- *
669- * Benchmark (size) Mode Samples Score Score error Units
670- * r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5060743.715 100445.513 ops/s
671- * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 36606.582 1610.582 ops/s
672- * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 38.476 0.973 ops/s
673- *
674- * With this batching:
675- *
676- * Benchmark (size) Mode Samples Score Score error Units
677- * r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5367945.738 262740.137 ops/s
678- * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 62703.930 8496.036 ops/s
679- * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 72.711 3.746 ops/s
680- *} </pre>
681- */
651+ // request upstream what we just emitted
652+ if (emitted > 0 ) {
682653 request (emitted );
683- // we are modifying this outside of the emit lock ... but this can be considered a "lazySet"
684- // and it will be flushed before anything else touches it because the emitLock will be obtained
685- // before any other usage of it
686- emitted = 0 ;
687654 }
688655 }
689656 if (enqueue ) {
0 commit comments