Skip to content

Possible issue with switchOnNext and combineLatest #129

@drteeth

Description

@drteeth

Hi there,

I'm working on an Android app that has a map, and when the user moves the map it should submit an HTTP request to an API for results. The other bit to consider is that there are additional search params that need to be mixed in along side the map coordinates. I'm not super well versed in rx yet, but I think I've hit a bug.

I'm combining the map moves with a BehaviorSubject with the search params, then mapping those into api calls and using switchOnNext to only return the latest one. I'm currently cancelling requests that take too long. I cancel the okhttp call, catch the InteruptedIOException and call .onCompleted() on the subscriber.

This works for a time, but after exactly 16 of those, no more requests pass are submitted although the combineLatest step continues to execute.

I went digging through the rx implementation as best I could and discovered that the RxRingBuffer has size 16 on Android. Perhaps there is something to that but I wasn't able to pin it down. I should mention that both the combineLatest and the switchOnNext work without the other in this case. I only see the issue when they are both in play.

Is this intended behavior? Is there a more appropriate pattern or operator I should use instead?

My crude test case that illustrates the problem is included below, please let me know if it needs clarification.

Thanks for the great work

Edit: i'm using rxandroid 0.24
Edit: 16 -> 20 iterations to more clearly show the output

package ca.meow.lasers;

import junit.framework.TestCase;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import rx.Observable;
import rx.Observer;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;

public class SwitchOnNextCombineLatestTest extends TestCase {

    public void test15Iterations() throws InterruptedException {
        runWithIterations(15);
    }

    public void test20Iterations() throws InterruptedException {
        runWithIterations(20);
    }

    private void runWithIterations(int iterations) throws InterruptedException {
        final BlockingQueue<String> output = new ArrayBlockingQueue<String>(100);

        // fake map moves
        Observable<Long> fakeMapMoves = Observable.interval(100, TimeUnit.MILLISECONDS).take(iterations);

        // some (fake) property that can change over time that will affect the search params
        Observable<String> someObservableProperty = Observable.just("Oh, hi there ");

        // combine map moves and the latest state for the search property
        Observable<String> searchRequests = Observable.combineLatest(fakeMapMoves, someObservableProperty,
            new Func2<Long, String, String>() {
                @Override public String call(Long tick, String txt) {
                    output.add("cmb");
                    return txt.concat(Long.toString(tick));
                }
            });

        // map all requests onto api calls
        Observable<Observable<Object>> inFlightSearches = searchRequests
            .map(new Func1<String, Observable<Object>>() {
                @Override public Observable<Object> call(String searchParams) {
                    output.add("api");
                    return simulateApiTimeout(searchParams);
                }
            });

        // switch on those api calls.
        Observable.switchOnNext(inFlightSearches)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<Object>() {
                @Override public void onCompleted() {
                    output.add("complete");
                }

                @Override public void onError(Throwable e) {
                    output.add("error");
                }

                @Override public void onNext(Object obj) {
                    output.add("next");
                }
            });

        // lol
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // build up a list of cmd, api pairs for each iteration
        List<String> expectedTokens = new ArrayList<String>();
        for (int i = 0; i < iterations; i++) {
            expectedTokens.add("cmb");
            expectedTokens.add("api");
        }

        String expected = join(expectedTokens).concat(", complete");
        String actual = join(output);
        assertEquals(expected, actual);
    }

    private static Observable<Object> simulateApiTimeout(String searchParams) {
        // Pretend the API call timed out and called onCompleted()
        return Observable.empty().subscribeOn(Schedulers.io());
    }

    private String join(Iterable tokens) throws InterruptedException {
        StringBuilder sb = new StringBuilder();
        boolean firstTime = true;
        for (Object token : tokens) {
            if (firstTime) {
                firstTime = false;
            } else {
                sb.append(", ");
            }
            sb.append(token);
        }
        return sb.toString();
    }

}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions