Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bounded replay() request coordination doesn't work properly with latecommers #3452

Closed
akarnokd opened this issue Oct 16, 2015 · 1 comment
Closed
Labels
Milestone

Comments

@akarnokd
Copy link
Member

Replay works by requesting as many items as the largest requester subscriber does. So if there is one with request(2) and another with request(5), replay will request(5) from the upstream. To be precise, subscriber requests are accumulated and the operator requests the difference between the last known max and the latest known max. This way, if there is an additional request(10) from the first subscriber above, replay will request only 5 additional elements.

This works for the unbounded replay() because all subscribers start from the very beginning. However, if the buffer is bounded, this differencing doesn't work anymore. The following unit test fails with both 1.x and 2.x implementations.

@Test
public void testSubscribersComeAndGoAtRequestBoundaries() {
    ConnectableObservable<Integer> source = Observable.range(1, 10).replay(1);
    source.connect();

    TestSubscriber<Integer> ts1 = TestSubscriber.create(2);

    source.subscribe(ts1);

    ts1.assertValues(1, 2);
    ts1.assertNoErrors();
    ts1.unsubscribe();

    TestSubscriber<Integer> ts2 = TestSubscriber.create(2);

    source.subscribe(ts2);

    ts2.assertValues(2, 3);
    ts2.assertNoErrors();
    ts2.unsubscribe();

    TestSubscriber<Integer> ts3 = TestSubscriber.create();

    source.subscribe(ts3);

    ts3.assertNoErrors();
    ts3.assertValues(3, 4, 5, 6, 7, 8, 9, 10);
    ts3.assertCompleted();
}

What happens here is that even though ts2 subscribes after 2 elements were requested from source, it only receives the very last and replay() doesn't request 1 more.

The idea about fixing this is that the total requested amount of late subscribers would start from a "current index", i.e., the number of items received by replay() so far.

This approach would work in this synchronous test but may not work with asynchronous subscribers. The problem is that the start node and the index has to be atomically updated together so a subscriber "pins" both at the same time: the continuous delivery is guaranteed as well as the proper total requested amount. I'll investigate to make this happen.

Let me emphasize again that the unbounded replay() works properly and the v2 ReplaySubject isn't affected because it is unbounded-in.

@akarnokd
Copy link
Member Author

Fix in #3454.

@akarnokd akarnokd closed this as completed Feb 9, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant