-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
combineLatest back-pressure bug #1717
Comments
Thanks for the report. I'll take a look. It may actually be related to combineLatest. ObserveOn may just be the thing that exposes it. |
We ran into this in an Rx debugger demo where the UI suddenly froze, so I guess the Android folks may run into it as well at some point; the code above was the smallest retro we could find. |
Replicated with this unit test: @Test
public void testWithCombineLatestIssue1717() {
Observable<Long> timer = Observable.timer(0, 1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
.doOnEach(new Action1<Notification<? super Long>>() {
@Override
public void call(Notification<? super Long> n) {
System.out.println(n);
}
});
TestSubscriber<Long> ts = new TestSubscriber<Long>();
Observable.combineLatest(timer, Observable.<Void> never(), new Func2<Long, Void, Long>() {
@Override
public Long call(Long t1, Void t2) {
System.out.println("t1: " + t1);
return t1;
}
}).take(RxRingBuffer.SIZE*2).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
} It looks like |
Actually ... this test should never pass. Since the test is combining with It queues up 512 items from the timer, and then correctly waits for Modifying the test so it emits at least one value before hanging then allows it to push data through: @Test
public void testWithCombineLatestIssue1717() {
Observable<Long> timer = Observable.timer(0, 1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
.doOnEach(new Action1<Notification<? super Long>>() {
@Override
public void call(Notification<? super Long> n) {
System.out.println(n);
}
});
TestSubscriber<Long> ts = new TestSubscriber<Long>();
Observable.combineLatest(timer, Observable.just(1).concatWith(Observable.<Integer>never()), new Func2<Long, Integer, Long>() {
@Override
public Long call(Long t1, Integer t2) {
return t1;
}
}).take(RxRingBuffer.SIZE*2).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
} Note the @GeorgiKhomeriki @headinthebox Do you expect |
If |
In 0.19.x before backpressure, this code just runs forever emitting values: @Test
public void testWithCombineLatestIssue1717() {
Observable<Long> timer = Observable.timer(0, 1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
.doOnEach(new Action1<Notification<? super Long>>() {
@Override
public void call(Notification<? super Long> n) {
System.out.println(n);
}
});
TestSubscriber<Long> ts = new TestSubscriber<Long>();
Observable.combineLatest(timer, Observable.<Void> never(), new Func2<Long, Void, Long>() {
@Override
public Long call(Long t1, Void t2) {
System.out.println("t1: " + t1);
return t1;
}
}).take(2000).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
} This code never prints anything in 0.19 as well (before backpressure): @Test
public void testWithCombineLatestIssue1717() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Observable.combineLatest(Observable.just(1), Observable.<Void> never(), new Func2<Integer, Void, Integer>() {
@Override
public Integer call(Integer t1, Void t2) {
System.out.println("t1: " + t1 + " t2: " + t2);
return t1;
}
}).subscribe(ts);
ts.awaitTerminalEvent();
} Should it emit anything when |
The introtorx.com site states:
http://www.introtorx.com/Content/v1.0.10621.0/12_CombiningSequences.html#CombiningMultipleSequences |
@benjchristensen There is a subtle difference here, because we have a side-effect in the |
The example code is pretty nasty. I am still puzzled what is exactly going on but as @GeorgiKhomeriki says, the trick is that there is a side-effect in the first input to a Before back pressure, and without any |
I found what's happening, it's odd. Before a value has been received from each After a value has been received from each The reason why Bug 2 is that the |
Juicy! |
@headinthebox You said this:
This is understood to only happen before it starts emitting. http://www.introtorx.com/Content/v1.0.10621.0/12_CombiningSequences.html#CombineLatest
Is the behavior I described above in my previous comment correct? In other words that it drops data while waiting for all to have a "latest" but once all have a "latest" that it will no longer drop? |
Yup, that's absolutely correct; there is a distinct phase change. I was confusingly describing the test case where one of the sides did not receive a value, I edited the comment to reflect that. |
Thanks for the confirmation. I'll work on fixing the 2 items I identified. |
Like! |
I have merged the fixes. |
I tried to build the latest
|
Running the build a second time (with the --debug flag) failed on the following two tests:
|
I have no idea what that's about. Never seen it.
Looks like a non-deterministic test of some kind. It passes on my machine and Travis which means we're just getting lucky on scheduling. |
Not sure if this helps but I'm running the build on a MacBook Air (Mid 2013) with OS X 10.8.5. |
I have run |
Something really strange is happening. When I ran the build a third time, it succeeded. Then after running
Is there some relation between these tests and the latest changes? Would you like me to send you the test reports? |
Those tests are not related. |
Today neither me nor @headinthebox were able to reproduce this failing build. I'm sure I wasn't hallucinating yesterday, and I still have the test reports saved if you're interested :-) |
I have no idea what was going on for you, but I'm not going to spend time chasing after them. Please re-open this issue if the 1.0.0-rc.5 release does not fix the combineLatest issues for you. |
The following program stops printing numbers around 511 where the expected behaviour is that it runs forever.
Without the
observeOn
it works as expected.The text was updated successfully, but these errors were encountered: