Skip to content

1.x - AsyncOnSubscribe may never complete due to premature unsubscribe from requested observables when onCompleted is emitted from the callback. #5429

@yurgisbaykshtis

Description

@yurgisbaykshtis

The problem can be reproduced using a variety of downstream operators such as zip, zipWith, mergeWith, concatEager, but only if the requested observable is delayed (simulating an async source). Zip and mergeWith use cases require at least 90 items to be emitted (likely related to a buffer size and request-more threshold)

Below is the list of test case with delay on/off, and different item counts.

 private Observable<Integer> createAsyncOnSubscribe(int count, boolean delay, Scheduler scheduler) {
    return Observable.create(AsyncOnSubscribe.<Integer,Integer>createStateful(()->0,(state, requested, emitter)->{
      if (state == 0) {
        Observable<Integer> items = Observable.range(0, count);
        emitter.onNext(delay? items.delay(1, TimeUnit.SECONDS, scheduler) : items);
      } else {
        emitter.onCompleted();
      }
      return state + 1;
    }))
    .timeout(1, TimeUnit.MINUTES, scheduler);
  }
  
  @Test
  public void testConcatEagerSingleNoDelay() {
    System.out.println("\nconcatEager single no delay:");
    TestScheduler sched = Schedulers.test();
    TestSubscriber<Object> ts = new TestSubscriber<>(); 
    Observable.concatEager(Observable.just(createAsyncOnSubscribe(1, false, sched))).subscribe(ts);
    sched.advanceTimeBy(1, TimeUnit.HOURS);
    System.out.println("errors:" + ts.getOnErrorEvents());
    ts.assertCompleted();
  }

  @Test
  public void testConcatEagerSingleDelay() {
    System.out.println("\nconcatEager single with delay:");
    TestScheduler sched = Schedulers.test();
    TestSubscriber<Object> ts = new TestSubscriber<>(); 
    Observable.concatEager(Observable.just(createAsyncOnSubscribe(1, true, sched))).subscribe(ts);
    sched.advanceTimeBy(1, TimeUnit.HOURS);
    System.out.println("errors:" + ts.getOnErrorEvents());
    ts.assertCompleted();
  }

  @Test
  public void testConcatEager100Delay() {
    System.out.println("\nconcatEager 100 with delay:");
    TestScheduler sched = Schedulers.test();
    TestSubscriber<Object> ts = new TestSubscriber<>(); 
    Observable.concatEager(Observable.just(createAsyncOnSubscribe(100, true, sched))).subscribe(ts);
    sched.advanceTimeBy(1, TimeUnit.HOURS);
    System.out.println("errors:" + ts.getOnErrorEvents());
    ts.assertCompleted();
  }

  @Test
  public void testConcatEager50Delay() {
    System.out.println("\nconcatEager 50 with delay:");
    TestScheduler sched = Schedulers.test();
    TestSubscriber<Object> ts = new TestSubscriber<>(); 
    Observable.concatEager(Observable.just(createAsyncOnSubscribe(50, true, sched))).subscribe(ts);
    sched.advanceTimeBy(1, TimeUnit.HOURS);
    System.out.println("errors:" + ts.getOnErrorEvents());
    ts.assertCompleted();
  }

  @Test
  public void testConcatEagerEmptyDelay() {
    System.out.println("\nconcatEager empty with delay:");
    TestScheduler sched = Schedulers.test();
    TestSubscriber<Object> ts = new TestSubscriber<>(); 
    Observable.concatEager(Observable.just(createAsyncOnSubscribe(0, true, sched))).subscribe(ts);
    sched.advanceTimeBy(1, TimeUnit.HOURS);
    System.out.println("errors:" + ts.getOnErrorEvents());
    ts.assertCompleted();
  }

  @Test
  public void testConcatSingleDelay() {
    System.out.println("\nconcat single with delay:");
    TestScheduler sched = Schedulers.test();
    TestSubscriber<Object> ts = new TestSubscriber<>(); 
    Observable.concat(Observable.just(createAsyncOnSubscribe(1, true, sched))).subscribe(ts);
    sched.advanceTimeBy(1, TimeUnit.HOURS);
    System.out.println("errors:" + ts.getOnErrorEvents());
    ts.assertCompleted();
  }

  @Test
  public void testZip100NoDelay() {
    System.out.println("\nzip 100 no delay:");
    TestScheduler sched = Schedulers.test();
    TestSubscriber<Object> ts = new TestSubscriber<>(); 
    Observable.zip(Observable.just(createAsyncOnSubscribe(100, false, sched)), items->items[0]).subscribe(ts);
    sched.advanceTimeBy(1, TimeUnit.HOURS);
    System.out.println("errors:" + ts.getOnErrorEvents());
    ts.assertCompleted();
  }

  @Test
  public void testZip100Delay() {
    System.out.println("\nzip 100 with delay:");
    TestScheduler sched = Schedulers.test();
    TestSubscriber<Object> ts = new TestSubscriber<>(); 
    Observable.zip(Observable.just(createAsyncOnSubscribe(100, true, sched)), items->items[0]).subscribe(ts);
    sched.advanceTimeBy(1, TimeUnit.HOURS);
    System.out.println("errors:" + ts.getOnErrorEvents());
    ts.assertCompleted();
  }

  @Test
  public void testZip50Delay() {
    System.out.println("\nzip 50 with delay:");
    TestScheduler sched = Schedulers.test();
    TestSubscriber<Object> ts = new TestSubscriber<>(); 
    Observable.zip(Observable.just(createAsyncOnSubscribe(50, true, sched)), items->items[0]).subscribe(ts);
    sched.advanceTimeBy(1, TimeUnit.HOURS);
    System.out.println("errors:" + ts.getOnErrorEvents());
    ts.assertCompleted();
  }

  @Test
  public void testZipWith100Delay() {
    System.out.println("\nzipWith 100 with delay:");
    TestScheduler sched = Schedulers.test();
    TestSubscriber<Object> ts = new TestSubscriber<>(); 
    createAsyncOnSubscribe(100, true, sched).zipWith(Observable.range(0,Integer.MAX_VALUE), (x,y)->x).subscribe(ts);
    sched.advanceTimeBy(1, TimeUnit.HOURS);
    System.out.println("errors:" + ts.getOnErrorEvents());
    ts.assertCompleted();
  }

  @Test
  public void testZipWith100NoDelay() {
    System.out.println("\nzipWith 100 no delay:");
    TestScheduler sched = Schedulers.test();
    TestSubscriber<Object> ts = new TestSubscriber<>(); 
    createAsyncOnSubscribe(100, false, sched).zipWith(Observable.range(0,Integer.MAX_VALUE), (x,y)->x).subscribe(ts);
    sched.advanceTimeBy(1, TimeUnit.HOURS);
    System.out.println("errors:" + ts.getOnErrorEvents());
    ts.assertCompleted();
  }

  @Test
  public void testZipWith50Delay() {
    System.out.println("\nzipWith 50 no delay:");
    TestScheduler sched = Schedulers.test();
    TestSubscriber<Object> ts = new TestSubscriber<>(); 
    createAsyncOnSubscribe(50, true, sched).zipWith(Observable.just(0), (x,y)->x).subscribe(ts);
    sched.advanceTimeBy(1, TimeUnit.HOURS);
    System.out.println("errors:" + ts.getOnErrorEvents());
    ts.assertCompleted();
  }

  @Test
  public void testMergeWith100Delay() {
    System.out.println("\nmergeWith 100 with delay:");
    TestScheduler sched = Schedulers.test();
    TestSubscriber<Object> ts = new TestSubscriber<>(); 
    createAsyncOnSubscribe(100, true, sched).mergeWith(Observable.just(0)).subscribe(ts);
    sched.advanceTimeBy(1, TimeUnit.HOURS);
    System.out.println("errors:" + ts.getOnErrorEvents());
    ts.assertCompleted();
  }


  @Test
  public void testMergeWith100NoDelay() {
    System.out.println("\nmergeWith 100 no delay:");
    TestScheduler sched = Schedulers.test();
    TestSubscriber<Object> ts = new TestSubscriber<>(); 
    createAsyncOnSubscribe(100, false, sched).mergeWith(Observable.just(0)).subscribe(ts);
    sched.advanceTimeBy(1, TimeUnit.HOURS);
    System.out.println("errors:" + ts.getOnErrorEvents());
    ts.assertCompleted();
  }

  @Test
  public void testMergeWithRange50Delay() {
    System.out.println("\nmergeWith 50 with delay:");
    TestScheduler sched = Schedulers.test();
    TestSubscriber<Object> ts = new TestSubscriber<>(); 
    createAsyncOnSubscribe(50, true, sched).mergeWith(Observable.just(0)).subscribe(ts);
    sched.advanceTimeBy(1, TimeUnit.HOURS);
    System.out.println("errors:" + ts.getOnErrorEvents());
    ts.assertCompleted();
  }

This is the test output:

zipWith 100 with delay:
errors:[java.util.concurrent.TimeoutException]

mergeWith 100 with delay:
errors:[java.util.concurrent.TimeoutException]

concatEager single no delay:
errors:[]

concatEager 50 with delay:
errors:[java.util.concurrent.TimeoutException]

zip 50 with delay:
errors:[]

concatEager single with delay:
errors:[java.util.concurrent.TimeoutException]

mergeWith 50 with delay:
errors:[]

concatEager empty with delay:
errors:[]

zip 100 with delay:
errors:[java.util.concurrent.TimeoutException]

zipWith 50 no delay:
errors:[]

zipWith 100 no delay:
errors:[]

concat single with delay:
errors:[]

concatEager 100 with delay:
errors:[java.util.concurrent.TimeoutException]

zip 100 no delay:
errors:[]

mergeWith 100 no delay:
errors:[]

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions