Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix retry() race conditions #2997

Merged
merged 2 commits into from
Jun 17, 2015
Merged

Fix retry() race conditions #2997

merged 2 commits into from
Jun 17, 2015

Conversation

davidmoten
Copy link
Collaborator

This is the continuation of #2930 with a rebased PR.

There are sporadic testing failures with this PR so not ready for merge. I'll note some failures with this one soon.

@davidmoten
Copy link
Collaborator Author

To induce failure I run this in a loop:

./gradlew -i -Dtest.single=OperatorRetry cleanTest test

On Ubuntu 14.04 64-bit i7 i7 CPU 920 @ 2.67GHz × 8, home desktop, java 1.8u45:

? tested with wrong branch version, testing again !

On Ubuntu 14.04 64-bit i5 CPU U 470 @ 1.33GHz × 4, laptop, java 1.8u45:

238th run:

java.lang.AssertionError: Data content mismatch: 1638={beginningEveryTime x 130}
    at org.junit.Assert.fail(Assert.java:93)
    at rx.internal.operators.OperatorRetryTest.testRetryWithBackpressureParallel(OperatorRetryTest.java:768)

@akarnokd
Copy link
Member

akarnokd commented Jun 8, 2015

I still don't understand the cause of the failure. The best thing we can do is to increase that await timeout in the test to 5-7 seconds because this seems to be some transient hiccup rather than a hang. Otherwise, support this PR but defer the judgement to @benjchristensen .

@davidmoten
Copy link
Collaborator Author

removed last comment, ran on wrong branch

@davidmoten
Copy link
Collaborator Author

I bumped up await timeout to 5 minutes and the 54th run failed in 5 mins 9.9 seconds (on home desktop):

rx.internal.operators.OperatorRetryTest > testRetryWithBackpressureParallel FAILED
    java.lang.AssertionError: Data content mismatch: 950={beginningEveryTime x 136}
        at org.junit.Assert.fail(Assert.java:93)
        at rx.internal.operators.OperatorRetryTest.testRetryWithBackpressureParallel(OperatorRetryTes
t.java:768)

5 minutes is a long time for some OS hiccup and makes me think that the freeze is occurring because of our code.

I'll run another test and dump all threads after the freeze.

@akarnokd
Copy link
Member

akarnokd commented Jun 9, 2015

Then we need a full rewrite, but I don't have time for it for at least 3 months.

@davidmoten
Copy link
Collaborator Author

Thread dump after freeze occuring on 84th run on home desktop:

rx.internal.operators.OperatorRetryTest > testRetryWithBackpressureParallel STANDARD_OUT
    testRetryWithBackpressureParallelLoop -> 0
    "pool-1-thread-4" 
       java.lang.Thread.State: WAITING
            at sun.misc.Unsafe.park(Native Method)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
            at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
            at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

    "pool-1-thread-3" 
       java.lang.Thread.State: WAITING
            at sun.misc.Unsafe.park(Native Method)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
            at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
            at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

    "pool-1-thread-2" 
       java.lang.Thread.State: RUNNABLE
            at sun.management.ThreadImpl.getThreadInfo1(Native Method)
            at sun.management.ThreadImpl.getThreadInfo(ThreadImpl.java:174)
            at rx.internal.operators.OperatorRetryTest.dumpAllThreads(OperatorRetryTest.java:913)
            at rx.internal.operators.OperatorRetryTest.access$300(OperatorRetryTest.java:44)
            at rx.internal.operators.OperatorRetryTest$14.run(OperatorRetryTest.java:751)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

    "pool-1-thread-1" 
       java.lang.Thread.State: WAITING
            at sun.misc.Unsafe.park(Native Method)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
            at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
            at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

    "RxComputationThreadPool-8" 
       java.lang.Thread.State: WAITING
            at sun.misc.Unsafe.park(Native Method)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
            at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

    "RxComputationThreadPool-7" 
       java.lang.Thread.State: WAITING
            at sun.misc.Unsafe.park(Native Method)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
            at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

    "RxComputationThreadPool-6" 
       java.lang.Thread.State: WAITING
            at sun.misc.Unsafe.park(Native Method)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
            at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

    "RxComputationThreadPool-5" 
       java.lang.Thread.State: WAITING
            at sun.misc.Unsafe.park(Native Method)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
            at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

    "RxComputationThreadPool-4" 
       java.lang.Thread.State: WAITING
            at sun.misc.Unsafe.park(Native Method)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
            at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

    "RxComputationThreadPool-3" 
       java.lang.Thread.State: WAITING
            at sun.misc.Unsafe.park(Native Method)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
            at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

    "RxComputationThreadPool-2" 
       java.lang.Thread.State: TIMED_WAITING
            at sun.misc.Unsafe.park(Native Method)
            at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
            at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

    "RxComputationThreadPool-1" 
       java.lang.Thread.State: TIMED_WAITING
            at sun.misc.Unsafe.park(Native Method)
            at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
            at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

    "/0:0:0:0:0:0:0:1:51855 to /0:0:0:0:0:0:0:1%lo:50738 workers Thread 3" 
       java.lang.Thread.State: RUNNABLE
            at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
            at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
            at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
            at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
            at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
            at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
            at org.gradle.messaging.remote.internal.inet.SocketConnection$SocketInputStream.read(SocketConnection.java:149)
            at com.esotericsoftware.kryo.io.Input.fill(Input.java:139)
            at com.esotericsoftware.kryo.io.Input.require(Input.java:159)
            at com.esotericsoftware.kryo.io.Input.readByte(Input.java:255)
            at org.gradle.messaging.serialize.kryo.KryoBackedDecoder.readByte(KryoBackedDecoder.java:80)
            at org.gradle.messaging.remote.internal.hub.InterHubMessageSerializer$MessageReader.read(InterHubMessageSerializer.java:69)
            at org.gradle.messaging.remote.internal.hub.InterHubMessageSerializer$MessageReader.read(InterHubMessageSerializer.java:58)
            at org.gradle.messaging.remote.internal.inet.SocketConnection.receive(SocketConnection.java:74)
            at org.gradle.messaging.remote.internal.hub.MessageHub$ConnectionReceive.run(MessageHub.java:235)
            at org.gradle.internal.concurrent.DefaultExecutorFactory$StoppableExecutorImpl$1.run(DefaultExecutorFactory.java:64)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

    "/0:0:0:0:0:0:0:1:51855 to /0:0:0:0:0:0:0:1%lo:50738 workers Thread 2" 
       java.lang.Thread.State: WAITING
            at sun.misc.Unsafe.park(Native Method)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
            at org.gradle.messaging.remote.internal.hub.queue.EndPointQueue.take(EndPointQueue.java:49)
            at org.gradle.messaging.remote.internal.hub.MessageHub$ConnectionDispatch.run(MessageHub.java:278)
            at org.gradle.internal.concurrent.DefaultExecutorFactory$StoppableExecutorImpl$1.run(DefaultExecutorFactory.java:64)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

    "Test worker" 
       java.lang.Thread.State: WAITING
            at sun.misc.Unsafe.park(Native Method)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
            at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
            at rx.internal.operators.OperatorRetryTest.testRetryWithBackpressureParallel(OperatorRetryTest.java:771)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:497)
            at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
            at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
            at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
            at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
            at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
            at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
            at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
            at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
            at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
            at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
            at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
            at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
            at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
            at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86)
            at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49)
            at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69)
            at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:497)
            at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
            at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
            at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
            at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
            at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
            at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:497)
            at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
            at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
            at org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360)
            at org.gradle.internal.concurrent.DefaultExecutorFactory$StoppableExecutorImpl$1.run(DefaultExecutorFactory.java:64)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

    "Signal Dispatcher" 
       java.lang.Thread.State: RUNNABLE

    "Finalizer" 
       java.lang.Thread.State: WAITING
            at java.lang.Object.wait(Native Method)
            at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
            at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
            at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

    "Reference Handler" 
       java.lang.Thread.State: WAITING
            at java.lang.Object.wait(Native Method)
            at java.lang.Object.wait(Object.java:502)
            at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157)

    "main" 
       java.lang.Thread.State: WAITING
            at sun.misc.Unsafe.park(Native Method)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
            at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
            at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:68)
            at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:44)
            at org.gradle.process.internal.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:78)
            at org.gradle.process.internal.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:35)
            at org.gradle.process.internal.child.ImplementationClassLoaderWorker.execute(ImplementationClassLoaderWorker.java:85)
            at org.gradle.process.internal.child.ImplementationClassLoaderWorker.execute(ImplementationClassLoaderWorker.java:41)
            at org.gradle.process.internal.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:43)
            at org.gradle.process.internal.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:32)
            at org.gradle.process.internal.launcher.BootstrapClassLoaderWorker.call(BootstrapClassLoaderWorker.java:46)
            at org.gradle.process.internal.launcher.BootstrapClassLoaderWorker.call(BootstrapClassLoaderWorker.java:32)
            at jarjar.org.gradle.process.internal.launcher.GradleWorkerMain.run(GradleWorkerMain.java:32)
            at jarjar.org.gradle.process.internal.launcher.GradleWorkerMain.main(GradleWorkerMain.java:37)



rx.internal.operators.OperatorRetryTest > testRetryWithBackpressureParallel FAILED
    java.lang.AssertionError: Data content mismatch: 4208={beginningEveryTime x 128}
        at org.junit.Assert.fail(Assert.java:93)
        at rx.internal.operators.OperatorRetryTest.testRetryWithBackpressureParallel(OperatorRetryTest.java:774)

@stealthcode
Copy link

@akarnokd I suspect that the retry when code base is exposing a race condition somewhere in the merge, producer, subscriber interactions. I think that we should hold off on any kind of a rewrite until #2928 is resolved.

@davidmoten
Copy link
Collaborator Author

At the moment I don't suspect problems with the OnSubscribeRedo class and

  • OperatorObserveOn has been improved and inspected pretty closely (a little more to do there with unsubscribe)
  • I don't suspect PublishSubject after trying simpler single subscriber versions of PublishSubject
  • I scanned further into Schedulers and the subscriber hook-up and found nothing

I've been looking elsewhere and now my suspicions lie with the gradle infrastructure. I believe the tests for retry (redo) don't fail when jammed in a loop and left for a long time, it is only when gradle is repeatedly asked to clean and test that it fails infrequently and perhaps because of some NIO lockup.

This is an unusual case inasmuch as we don't normally rely on tests looping on gradle tests but rather run loops in a single JVM. A fresh JVM can sometimes expose race conditions that aren't as easily exposed by a warmed-up JVM but I think this is being pushed a bit far and might be exposing us to rarely encountered gradle bugs too.

I think OnSubscribeRedo is ready for review (it certainly fixes known bugs with retry so there is some imperative to get this in).

@akarnokd
Copy link
Member

I think it is as good as it can get for now. I think we can return to the case when we port things over to RxJava 2.0. Thanks!

akarnokd added a commit that referenced this pull request Jun 17, 2015
@akarnokd akarnokd merged commit d2a5d5d into ReactiveX:1.x Jun 17, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants