Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

combineLatest back-pressure bug #1717

Closed
flatmap13 opened this issue Oct 3, 2014 · 25 comments
Closed

combineLatest back-pressure bug #1717

flatmap13 opened this issue Oct 3, 2014 · 25 comments
Labels
Milestone

Comments

@flatmap13
Copy link
Contributor

The following program stops printing numbers around 511 where the expected behaviour is that it runs forever.

import java.util.concurrent.TimeUnit

import rx.lang.scala.Observable
import rx.lang.scala.schedulers.NewThreadScheduler

import scala.concurrent.duration.Duration

object CombineLatestTest {
  def main(args: Array[String]) {

    var left = Observable.timer(Duration(0, TimeUnit.MILLISECONDS), Duration(1, TimeUnit.MILLISECONDS))

    left.observeOn(NewThreadScheduler()).doOnEach(println(_)).combineLatest(Observable.never).subscribe()

    readLine()
  }
}

Without the observeOn it works as expected.

@headinthebox headinthebox added this to the 1.0 milestone Oct 3, 2014
@benjchristensen
Copy link
Member

Thanks for the report. I'll take a look. It may actually be related to combineLatest. ObserveOn may just be the thing that exposes it.

@headinthebox
Copy link
Contributor

We ran into this in an Rx debugger demo where the UI suddenly froze, so I guess the Android folks may run into it as well at some point; the code above was the smallest retro we could find.

@benjchristensen
Copy link
Member

Replicated with this unit test:

    @Test
    public void testWithCombineLatestIssue1717() {
        Observable<Long> timer = Observable.timer(0, 1, TimeUnit.MILLISECONDS)
                .observeOn(Schedulers.newThread())
                .doOnEach(new Action1<Notification<? super Long>>() {

                    @Override
                    public void call(Notification<? super Long> n) {
                        System.out.println(n);
                    }

                });

        TestSubscriber<Long> ts = new TestSubscriber<Long>();

        Observable.combineLatest(timer, Observable.<Void> never(), new Func2<Long, Void, Long>() {

            @Override
            public Long call(Long t1, Void t2) {
                System.out.println("t1: " + t1);
                return t1;
            }

        }).take(RxRingBuffer.SIZE*2).subscribe(ts);

        ts.awaitTerminalEvent();
        ts.assertNoErrors();
    }

It looks like combineLatest is not requesting more upwards.

@benjchristensen benjchristensen changed the title observeOn back-pressure bug combineLatest back-pressure bug Oct 6, 2014
@benjchristensen
Copy link
Member

Actually ... this test should never pass.

Since the test is combining with never that will never emit and thus combineLatest will never emit anything.

It queues up 512 items from the timer, and then correctly waits for never to emit something, but it never does.

Modifying the test so it emits at least one value before hanging then allows it to push data through:

    @Test
    public void testWithCombineLatestIssue1717() {
        Observable<Long> timer = Observable.timer(0, 1, TimeUnit.MILLISECONDS)
                .observeOn(Schedulers.newThread())
                .doOnEach(new Action1<Notification<? super Long>>() {

                    @Override
                    public void call(Notification<? super Long> n) {
                        System.out.println(n);
                    }

                });

        TestSubscriber<Long> ts = new TestSubscriber<Long>();

        Observable.combineLatest(timer, Observable.just(1).concatWith(Observable.<Integer>never()), new Func2<Long, Integer, Long>() {

            @Override
            public Long call(Long t1, Integer t2) {
                return t1;
            }

        }).take(RxRingBuffer.SIZE*2).subscribe(ts);

        ts.awaitTerminalEvent();
        ts.assertNoErrors();
    }

Note the Observable.just(1).concatWith(Observable.<Integer>never()) code which will emit 1 and then never onComplete.

@GeorgiKhomeriki @headinthebox Do you expect combineLatest to emit even when one of the Observables never emits anything as never() does?

@benjchristensen
Copy link
Member

If combineLatest is supposed to emit, even when a value is not yet emitted, is it supposed to be null?

@benjchristensen
Copy link
Member

In 0.19.x before backpressure, this code just runs forever emitting values:

    @Test
    public void testWithCombineLatestIssue1717() {
        Observable<Long> timer = Observable.timer(0, 1, TimeUnit.MILLISECONDS)
                .observeOn(Schedulers.newThread())
                .doOnEach(new Action1<Notification<? super Long>>() {

                    @Override
                    public void call(Notification<? super Long> n) {
                        System.out.println(n);
                    }

                });

        TestSubscriber<Long> ts = new TestSubscriber<Long>();

        Observable.combineLatest(timer, Observable.<Void> never(), new Func2<Long, Void, Long>() {

            @Override
            public Long call(Long t1, Void t2) {
                System.out.println("t1: " + t1);
                return t1;
            }

        }).take(2000).subscribe(ts);

        ts.awaitTerminalEvent();
        ts.assertNoErrors();
    }

This code never prints anything in 0.19 as well (before backpressure):

    @Test
    public void testWithCombineLatestIssue1717() {
        TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
        Observable.combineLatest(Observable.just(1), Observable.<Void> never(), new Func2<Integer, Void, Integer>() {

            @Override
            public Integer call(Integer t1, Void t2) {
                System.out.println("t1: " + t1 + " t2: " + t2);
                return t1;
            }

        }).subscribe(ts);
        ts.awaitTerminalEvent();
    }

Should it emit anything when Observable.never() won't emit?

@benjchristensen
Copy link
Member

The introtorx.com site states:

Once both sequences have produced at least one value, the latest output from each sequence is passed to the resultSelector function every time either sequence produces a value.

http://www.introtorx.com/Content/v1.0.10621.0/12_CombiningSequences.html#CombiningMultipleSequences

@flatmap13
Copy link
Contributor Author

@benjchristensen There is a subtle difference here, because we have a side-effect in the doOnEach operator, before the combineLatest. I think this side-effect should execute every time the left observable (timer) emits a value. After that, the combineLatest can block the event from propagating further down the line since it never receives a value from the other Observable. So to answer your question, combineLatest should indeed not emit any values in this case, but the doOnEach should keep on printing forever.

@headinthebox
Copy link
Contributor

The example code is pretty nasty.

I am still puzzled what is exactly going on but as @GeorgiKhomeriki says, the trick is that there is a side-effect in the first input to a combineLatest that itself never produces a value (in the real use case, the other input is a stream that only receives very few values, and in our case few enough that the whole program froze).

Before back pressure, and without any observeOn, feeding stream xs into an combineLatest where the other input never produces was not an issue, combineLatest keeps the last value of xs and the side effects are happily executing as new values arrive.

@benjchristensen
Copy link
Member

I found what's happening, it's odd. combineLatest behaves differently before it gets values from all Observables than it does after.

Before a value has been received from each Observable it does NOT obey backpressure and just acts like a BehaviorSubject dropping data and retaining only the last.

After a value has been received from each Observable however it then expects to never drop data and emit all combinations. This means backpressure is needed to achieve this otherwise unbounded buffers are needed, as in 0.19 and earlier (https://github.com/ReactiveX/RxJava/blob/0.19.x/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java#L131).

The reason why observeOn affects it the way it does is that observeOn expects to receive a request for more data and combineLatest never does that. It is a bug in combineLatest that it doesn't request more after dropping data. That's bug 1.

Bug 2 is that the timer/observeOn relationship should result in a MissingBackpressureException being thrown because timer doesn't obey backpressure (by design). If observeOn exerts backpressure (as it does) and timer doesn't obey (as it won't) then an exception should be thrown.

@headinthebox
Copy link
Contributor

Juicy!

@benjchristensen
Copy link
Member

@headinthebox You said this:

since it only "remembers" the latest values in both input streams that is much too conservative

This is understood to only happen before it starts emitting.

http://www.introtorx.com/Content/v1.0.10621.0/12_CombiningSequences.html#CombineLatest

Once both sequences have produced at least one value, the latest output from each sequence is passed to the resultSelector function every time either sequence produces a value.

Is the behavior I described above in my previous comment correct? In other words that it drops data while waiting for all to have a "latest" but once all have a "latest" that it will no longer drop?

@headinthebox
Copy link
Contributor

Yup, that's absolutely correct; there is a distinct phase change.

I was confusingly describing the test case where one of the sides did not receive a value, I edited the comment to reflect that.

@benjchristensen
Copy link
Member

Thanks for the confirmation. I'll work on fixing the 2 items I identified.

@headinthebox
Copy link
Contributor

Like!

@benjchristensen
Copy link
Member

I have merged the fixes.

@flatmap13
Copy link
Contributor Author

I tried to build the latest 1.x branch but the build failed on rx.internal.operators.OnSubscribeCombineLatestTest.

org.mockito.exceptions.verification.NeverWantedButInvoked: 
observer.onNext(<any>);
Never wanted here:
-> at rx.internal.operators.OnSubscribeCombineLatestTest.testSecondNeverProduces(OnSubscribeCombineLatestTest.java:463)
But invoked here:
-> at rx.Observable$40.onNext(Observable.java:7542)

    at rx.internal.operators.OnSubscribeCombineLatestTest.testSecondNeverProduces(OnSubscribeCombineLatestTest.java:463)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48)
    at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
    at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
    at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105)
    at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:355)
    at org.gradle.internal.concurrent.DefaultExecutorFactory$StoppableExecutorImpl$1.run(DefaultExecutorFactory.java:64)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

@flatmap13
Copy link
Contributor Author

Running the build a second time (with the --debug flag) failed on the following two tests:

rx.BackpressureTests (testSubscribeOnScheduling):

java.lang.RuntimeException: Unexpected onError events: 1
    at rx.observers.TestSubscriber.assertNoErrors(TestSubscriber.java:170)
    at rx.BackpressureTests.testSubscribeOnScheduling(BackpressureTests.java:251)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
    at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
    at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:355)
    at org.gradle.internal.concurrent.DefaultExecutorFactory$StoppableExecutorImpl$1.run(DefaultExecutorFactory.java:64)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
Caused by: rx.exceptions.MissingBackpressureException
    at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:223)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:115)
    at rx.internal.operators.OperatorSubscribeOn$1$1$1.onNext(OperatorSubscribeOn.java:76)
    at rx.BackpressureTests$10$1.request(BackpressureTests.java:480)
    at rx.internal.operators.OperatorSubscribeOn$1$1$1$1$1.call(OperatorSubscribeOn.java:94)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:43)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
    ... 3 more

rx.CombineLatestTests(testCovarianceOfCombineLatest)

java.lang.ClassCastException: java.lang.Integer cannot be cast to rx.CovarianceTest$Result
    at rx.CombineLatestTests$2.call(CombineLatestTests.java:59)
    at rx.observables.BlockingObservable$1.onNext(BlockingObservable.java:119)
    at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:105)
    at rx.internal.operators.NotificationLite.accept(NotificationLite.java:147)
    at rx.internal.util.RxRingBuffer.accept(RxRingBuffer.java:316)
    at rx.internal.operators.OnSubscribeCombineLatest$MultiSourceProducer.tick(OnSubscribeCombineLatest.java:152)
    at rx.internal.operators.OnSubscribeCombineLatest$MultiSourceProducer.onNext(OnSubscribeCombineLatest.java:209)
    at rx.internal.operators.OnSubscribeCombineLatest$MultiSourceRequestableSubscriber.onNext(OnSubscribeCombineLatest.java:252)
    at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:41)
    at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:7569)
    at rx.internal.operators.OnSubscribeCombineLatest$MultiSourceProducer.request(OnSubscribeCombineLatest.java:131)
    at rx.Subscriber.setProducer(Subscriber.java:141)
    at rx.Subscriber.setProducer(Subscriber.java:137)
    at rx.internal.operators.OnSubscribeCombineLatest.call(OnSubscribeCombineLatest.java:68)
    at rx.internal.operators.OnSubscribeCombineLatest.call(OnSubscribeCombineLatest.java:45)
    at rx.Observable.subscribe(Observable.java:7658)
    at rx.observables.BlockingObservable.forEach(BlockingObservable.java:97)
    at rx.CombineLatestTests.testCovarianceOfCombineLatest(CombineLatestTests.java:43)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
    at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
    at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:355)
    at org.gradle.internal.concurrent.DefaultExecutorFactory$StoppableExecutorImpl$1.run(DefaultExecutorFactory.java:64)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

@benjchristensen
Copy link
Member

java.lang.ClassCastException: java.lang.Integer cannot be cast to rx.CovarianceTest$Result

I have no idea what that's about. Never seen it.

java.lang.RuntimeException: Unexpected onError events: 1

Looks like a non-deterministic test of some kind. It passes on my machine and Travis which means we're just getting lucky on scheduling.

@flatmap13
Copy link
Contributor Author

Not sure if this helps but I'm running the build on a MacBook Air (Mid 2013) with OS X 10.8.5.

@benjchristensen
Copy link
Member

I have run testSubscribeOnScheduling several hundred thousand times in loops and can't replicate it. Can you see anything that would cause the failure?

@flatmap13
Copy link
Contributor Author

Something really strange is happening. When I ran the build a third time, it succeeded. Then after running ./gradlew clean and rebuilding, it failed on 6 tests:

RefCountTests. testConnectDisconnectConnectAndSubjectState
ZipTests. testCovarianceOfZip
ZipTests. testZipObservableOfObservables
OnSubscribeCacheTest. testWithAsyncSubjectAndRepeat
OnSubscribeCacheTest. testWithReplaySubjectAndRepeat
OnSubscribeCombineLatestTest. combineSimple

Is there some relation between these tests and the latest changes? Would you like me to send you the test reports?

@benjchristensen
Copy link
Member

Those tests are not related.

@flatmap13
Copy link
Contributor Author

Today neither me nor @headinthebox were able to reproduce this failing build. I'm sure I wasn't hallucinating yesterday, and I still have the test reports saved if you're interested :-)

@benjchristensen
Copy link
Member

I have no idea what was going on for you, but I'm not going to spend time chasing after them.

Please re-open this issue if the 1.0.0-rc.5 release does not fix the combineLatest issues for you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants