Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OnSubscribeRedo - fix race conditions #2930

Closed
wants to merge 1 commit into from

Conversation

davidmoten
Copy link
Collaborator

While searching for the cause of #2863 I bumped into this race condition (which doesn't fix #2863):

If a request is made between L238 and L239 then consumerCapacity may become Long.MAX_VALUE on arriving at L239 in which case we don't wish to decrement it. To fix, used compareAndSet.

What is interesting about this fix is that in the test loop of 5000 in OperatorRetryTest I see many more occurrences of the failure on average (3 -> 50) presumably because the extra time to perform the compareAndSet action has expanded the window for the race condition causing the failures.

@@ -235,8 +235,9 @@ public void onError(Throwable e) {
@Override
public void onNext(T v) {
if (!done) {
if (consumerCapacity.get() != Long.MAX_VALUE) {
consumerCapacity.decrementAndGet();
long cap = consumerCapacity.get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, one should decrement the request/capacity after the value has been emitted, because otherwise if the value becomes zero, it may open a window for a concurrent emission that is kicked off by a transition from 0 to some value. Could you see if that helps with #2863 ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately still fails, now with less frequency in the 5000 attempts as before.

I'll adjust the PR anyway to reflect what you suggest.

@davidmoten
Copy link
Collaborator Author

Awesome, for once I'm happy to see a CI failure! Tilll now #2863 hasn't been repeatable except on my laptop but now has reappeared in the Travis build.

@akarnokd
Copy link
Member

akarnokd commented May 5, 2015

If I put in this, I get the missing emission problem:

long cc = consumerCapacity.get();
if (cc < Long.MAX_VALUE) {
    consumerCapacity.compareAndSet(cc, cc - 1);
}

if (rnd.nextDouble() < 0.25) {
    try {
        Thread.sleep(1);
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

child.onNext(v);

If I swap the operations, the test passes for me again.

Could you also update the helper method in the test?

static <T> StringBuilder sequenceFrequency(Iterable<T> it) {
        StringBuilder sb = new StringBuilder();

        Object prev = null;
        int cnt = 0;

        for (Object curr : it) {
            if (sb.length() > 0) {
                if (!curr.equals(prev)) {
                    if (cnt > 1) {
                        sb.append(" x ").append(cnt);
                        cnt = 1;
                    }
                    sb.append(", ");
                    sb.append(curr);
                } else {
                    cnt++;
                }
            } else {
                sb.append(curr);
                cnt++;
            }
            prev = curr;
        }
        if (cnt > 1) {
            sb.append(" x ").append(cnt);
        }

        return sb;
    }

@davidmoten
Copy link
Collaborator Author

@akarnokd while you were putting your last comment up I synchronized the reads and writes of consumerCapacity as in the attached commit and the failures have stopped. Doesn't explain it yet but seems like consumerCapacity is important to this.

I'll add your changes and revert the synchronization once you've had a quick look.

@davidmoten
Copy link
Collaborator Author

I thought as much given our previous conversations and figured that if this is the solution you'd have suggestions for doing it properly. Anyway probably need to figure out the why still I suppose.

child.onNext(v);
long cap = consumerCapacity.get();
if (cap != Long.MAX_VALUE) {
consumerCapacity.compareAndSet(cap, cap -1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this is that if the CAS fails, the 'production' of the value is not accounted and the capacity seems to rise without limit. The options are 1) add a loop that makes sure the decrement actually happens or 2) switch back to decrementAndGet and don't worry about request getting to max concurrently.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep of course it would need a loop if we went for the CAS option. I can chuck one in, your call.

@davidmoten
Copy link
Collaborator Author

I'm confused by this resumeBoundary business, I hope you fare better, off to bed for me.

@davidmoten
Copy link
Collaborator Author

I've committed the CAS loop. We can drop it as an optimization later based on perfs if we want or of course now if you think

@akarnokd
Copy link
Member

akarnokd commented May 5, 2015

Can you devise a benchmark that checks the throughput on a non-throwing source?

If I remember correctly, the resumeBoundary is there because an exception can happen just after the capacity reached zero but we don't immediately resubscribe until an actual request comes in.

@davidmoten
Copy link
Collaborator Author

I've added some more comments to the code as my understanding improves. I also have marked three places with //TODO, two of which I think are potential race conditions and the third is just a marker for some of the usual optimizations done in the request.

Another addition in the last commit is a check to ensure that the consumerCapacity decrement doesn't go below zero which now provokes a MissingBackpressureException.

I'm sure @akarnokd will have the red pen out on this stuff then if any of the race conditions stand scrutiny I'll fix them.

@davidmoten
Copy link
Collaborator Author

unrelated test failure OperatorPublishTest.testObserveOn. I've raised #2933.

@davidmoten
Copy link
Collaborator Author

I've ruled out one race condition (and left comments explaining why all is ok)

@akarnokd akarnokd added the Bug label May 6, 2015
@akarnokd
Copy link
Member

akarnokd commented May 6, 2015

I'd say if the change makes your test failure go away, that should be enough for now. I'd defer the full rewrite to 2.0.

@davidmoten
Copy link
Collaborator Author

Righto, I'll concentrate on the fix for the #2863 test failure. Part of that is probably ruling out surprises like decrementing consumerCapacity when it is already 0. My approach in that was to throw a MissingBackpressureException because in backpressure mode an operator should not emit more than requested. I would be concerned that this might appear as a breaking change to some though because previously it happily passed them through (with who knows what consequences in the operator). A compromise might be to not decrement when 0 but also not emit an error and then revisit this part of the contract in 2.0. Which way do you think we should go?

@davidmoten
Copy link
Collaborator Author

Hmm I just realized that if not emitting MissingBackpressureException then I should decrement otherwise the accounting is stuffed when more requests come through. I'll avoid the MissingBackpressureException for now but it should probably be part of some future milestone (2.0?).

@davidmoten
Copy link
Collaborator Author

Sorry to say, no progress made. I've protected all reads and writes to currentProducer and consumerCapacity by a single guard object (without holding a lock on emission or on request or on worker.schedule which could be run synchronously using Schedulers.immediate() or Schedulers.trampoline()) and still get the OperatorRetryTest.testRetryWithBackpressureParallel failure on my laptop. I give up for the moment. I'm leaning towards there being something fundamentally wrong that will probably only be fixed by a rewrite.

@davidmoten davidmoten changed the title OnSubscribeRedo - fix race condition with consumerCapacity decrement OnSubscribeRedo - fix race conditions May 7, 2015
@davidmoten
Copy link
Collaborator Author

I've pared this PR right down (after the unsuccessful hunt for the cause of #2863) so that it addresses just the race conditions that I can see and can confirm.

The changes are:

  • The original code to decrement consumerCapacity had a race condition, now we use a CAS loop to update
  • the c == 0 check at L342 can cause a stall in the stream because if request gets called just before L320 then no restart will occur in that call and it is possible that no further calls to request will be made after L320 has been performed (a common pattern is a request followed by another request only once emissions have occurred).
  • update sequenceFrequency helper method in the test as per @akarnokd request

#2863 is NOT fixed by these changes.

@akarnokd
Copy link
Member

akarnokd commented May 7, 2015

I applied some random sleep around the resumeBoundary and these changes worked for me (on top of your changes):
In the method with the locked check:

if (!isLocked.get() && !child.isUnsubscribed()) {
    if (consumerCapacity.get() > 0) {
        worker.schedule(subscribeToSource);
    }
}

and in the child's producer:

long c = BackpressureUtils.getAndAddRequest(consumerCapacity, n);
Producer producer = currentProducer.get();
if (producer != null) {
    producer.request(n);
} else
if (c == 0) {
    worker.schedule(subscribeToSource);
}

So if there is capacity available, the source will be restarted. If there is no capacity remaining, the first 0 -> n transition will restart the source.

@davidmoten
Copy link
Collaborator Author

I'll try it out, thanks! I'll also include a n>0 check in the producer so that calls of request(0) are no-ops.

@davidmoten
Copy link
Collaborator Author

Nice simplification. Still getting testRetryWithBackpressureParallel failure on my laptop (first time).

@davidmoten
Copy link
Collaborator Author

3 out of 3 failures on laptop, 1 out of 25 failures on fast desktop running all tests in OperatorRetryTest. I just run this in a loop:

./gradlew -q -Dtest.single=OperatorRetry cleanTest test

@akarnokd
Copy link
Member

akarnokd commented May 7, 2015

Apparently, the resumeBoundary served to prevent unwanted resubscription.
One final try:
wrap the above places into synchronized (consumerCapacity) plus the contents of terminalDelegatingSubscriber.setProducer and see if it still fails. If not, start moving out operations from this synchronized block and see which one triggers the failure.

Edit: forgot to mention: readd resumeBoundary as well.

@davidmoten
Copy link
Collaborator Author

This is as far as I got. No failures when everything in the synch blocks. Then started moving out the baddies for synchronization being calls to producer.request and worker.schedule (because could be a synchronous worker). Everything ok till I pulled worker.schedule out of onNext synch block.

                    @Override
                    public void onNext(Object t) {
                        if (!isLocked.get() && !child.isUnsubscribed()) {
                            final boolean scheduleNow;
                            synchronized (consumerCapacity) {
                                if (consumerCapacity.get() > 0) {
                                    scheduleNow = true;
                                } else {
                                    scheduleNow = false;
                                    resumeBoundary.compareAndSet(false, true);
                                }
                            }
                            if (scheduleNow)
                                worker.schedule(subscribeToSource);
                        }
                    }

                    @Override
                    public void setProducer(Producer producer) {
                        producer.request(Long.MAX_VALUE);
                    }
                });
            }
        });

        child.setProducer(new Producer() {

            @Override
            public void request(final long n) {
                final Producer producer;
                final boolean requestNow;
                final boolean scheduleNow;
                synchronized (consumerCapacity) {
                    BackpressureUtils.getAndAddRequest(consumerCapacity, n);
                    producer = currentProducer.get();
                    if (producer != null) {
                        requestNow = true;
                        scheduleNow = false;
                    } else {
                        requestNow = false;
                        scheduleNow = resumeBoundary.compareAndSet(true, false);
                    }
                }
                if (requestNow)
                    producer.request(n);
                else if (scheduleNow)
                    worker.schedule(subscribeToSource);
            }
        })

@akarnokd
Copy link
Member

akarnokd commented May 7, 2015

When the test fails, how many elements are missing from the output. Does it print beginningEveryTime x 256 ?

@davidmoten
Copy link
Collaborator Author

Yep:

java.lang.AssertionError: Data content mismatch: 2315={beginningEveryTime x 256} at org.junit.Assert.fail(Assert.java:93) at rx.internal.operators.OperatorRetryTest.testRetryWithBackpressureParallel(OperatorRetryTest.java:763)

@akarnokd
Copy link
Member

akarnokd commented May 7, 2015

I'm rewriting just the retry(n) variant and see if there is a general logical error with request accounting or just the OnSubscribeRedo has problems.

@davidmoten
Copy link
Collaborator Author

Three hours without failure on three machines (i7,i7 and i5 laptop). Can you try this commit @akarnokd?

@davidmoten
Copy link
Collaborator Author

Took 5 hours but got failure on laptop only:

java.lang.AssertionError: Data content mismatch: 883={beginningEveryTime x 138}
        at org.junit.Assert.fail(Assert.java:93)
        at rx.internal.operators.OperatorRetryTest.testRetryWithBackpressureParallel(OperatorRet

This is going to make testing difficult!

if (u < 0) {
u = Long.MAX_VALUE;
}
u -= mprod;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akarnokd What about not doing this subtraction if u is Long.MAX_VALUE?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Without it, the retry may land on MAX_VALUE - 1 on restart and not take the fast-path in certain sources.

@davidmoten
Copy link
Collaborator Author

Using jacoco as per #2937 I noticed that the subscriber to restarts never completes so never unsubscribes (though child does). To achieve this (and to simplify OnSubscribeRedo a little bit) I ensured that all events are routed through to filteredTerminals instead of just the onNext events.

@davidmoten
Copy link
Collaborator Author

I've added the fix to ProducerArbiter to not reduce the request if already Long.MAX_VALUE. Before applying this fix I ran the test which failed on run 207 but in this test I confirmed that count upstream of observeOn was same as downstream so I think the problem is not with OperatorSubscribeOn anymore. I'll rerun now we have a fix in ProducerArbiter.

@davidmoten
Copy link
Collaborator Author

The fix to ProducerArbiter could help the failure because concurrent requests to FuncWithErrors one of Long.MAX_VALUE and one of less than Long.MAX_VALUE could initiate the fast path and the backpressure path simultaneously.

@davidmoten
Copy link
Collaborator Author

Added more concurrency protection to FuncWithErrors to prevent fast path and backpressure path from running concurrently

@davidmoten
Copy link
Collaborator Author

Got this 74th run on laptop:

org.mockito.exceptions.verification.VerificationInOrderFailure: 
Verification in order failure:
observer.onNext("beginningEveryTime");
Wanted 257 times:
-> at rx.internal.operators.OperatorRetryTest.testRetryWithBackpressure(OperatorRetryTest.java:702)
But was 240 times:
-> at rx.observers.TestObserver.onNext(TestObserver.java:78)

    at rx.internal.operators.OperatorRetryTest.testRetryWithBackpressure(OperatorRetryTest.java:702)

@davidmoten
Copy link
Collaborator Author

Added another race condition fix for FuncWithErrors that prevents the subscriber o being passed more signals after a terminal signal has been sent to it.

@davidmoten
Copy link
Collaborator Author

54th run on desktop:

java.lang.AssertionError: Data content mismatch: 2408={beginningEveryTime x 128}
    at org.junit.Assert.fail(Assert.java:93)
    at rx.internal.operators.OperatorRetryTest.testRetryWithBackpressureParallel(OperatorRetryTest.java:764)

@davidmoten
Copy link
Collaborator Author

despite last commit, 22nd run on desktop:

java.lang.AssertionError: Data content mismatch: 4501={beginningEveryTime x 128}
    at org.junit.Assert.fail(Assert.java:93)
    at rx.internal.operators.OperatorRetryTest.testRetryWithBackpressureParallel(OperatorRetryTest.java:768)

@akarnokd
Copy link
Member

Try disabling transparent hugepages.

@davidmoten
Copy link
Collaborator Author

With the purpose of making the test fail more often?

@akarnokd
Copy link
Member

In the contrary. Linux transparent hugepages (default: on) is known to introduce latency spikes, maybe so extreme our test times out just by that.

@davidmoten
Copy link
Collaborator Author

The catch is that we have a low probability (for the test as it stands) race condition somewhere. If I make the test fail less often then we lose the ability to find that race condition. Which timeout are you thinking of? The individual timeouts for the tests are commented out, are there scheduler timeouts?

@akarnokd
Copy link
Member

I think there is no race condition but your linux system gets overwhelmed by the test and it simply times out. This is why I suggested looking at JVisualVM's thread graph to detect some gaps in the execution that can be attributed to the system.

@davidmoten
Copy link
Collaborator Author

Inspecting with jvisualvm is tricky because I have only got failures to happen using the gradle command line which means jvm restarts every thirty seconds or so. I've dug around pretty much everywhere now and may dig around a bit more in the future but perhaps we have achieved enough with this operator for it to be accepted back into the code base. The hunt certainly unearthed a few race conditions in retry, its tests, and in observeOn so was pretty useful and the original very frequent failure (for me) is now very infrequent.

I can rebase the commits and remove the updates to OperatorObserveOn from this commit so that it's ready for merge once you and Ben and whoever else has reviewed it. It will be good to get the fixes in #2929 merged as well of course as otherwise there will be some test flakiness on this commit.

Is that a reasonable plan or would you like to keep digging?

…re of OperatorRetry.testRetryWithBackpressureParallel
@davidmoten
Copy link
Collaborator Author

I've rebased commits. Ready for a hopefully final review.

@akarnokd
Copy link
Member

I'm quite a bit lost; don't know what worked and what not or did the changes work on your system or not. If the test fails with the outer pool of 1 thread, which should pose the least amount of work, then my suspect is that there is something wrong with Java on Linux or with Linux itself. Perhaps this is just another case of the recent futex bug.

@davidmoten
Copy link
Collaborator Author

The futex stuff is interesting, I'll have a close look at that soon. One thing I'm wondering is should we break the retries when a java.lang.Error is signalled? At the moment I assume if one occurs it will be suppressed and may have the side effects we see.

@davidmoten
Copy link
Collaborator Author

I did a little test and see that the sort of Errors that we care about do stop the retries, so that's not it.

The futex stuff shouldn't be an issue because it turned up in linux kernel 3.14 and I'm running 3.2.0-80 on my laptop and 3.13.0-49 on my desktop. I'll turn off transparent hugepages on my home desktop and see if the error happens.

@davidmoten
Copy link
Collaborator Author

Turned off transparent hugepages and failed on 18th run of this command:

./gradlew -i -Dtest.single=OperatorRetry cleanTest test

The run times are below. You'll notice that the runtime is not significantly larger for the final failing test which may rule out some types of OS behaviour being involved.

I think we can rule out the futex bug because I run either Ubuntu 12.04 or 14.04:

Ubuntu 14.04/Debian 8: have the fix for a long time [0] [1]
Ubuntu 12.04/Debian 7: was never affected [3] [2]. Newer enablement stack kernels for Ubuntu has the same fix as [1].
[0] http://kernel.ubuntu.com/git/ubuntu/linux.git/log/?showmsg=1&qt=grep&q=Avoid+taking+the+hb-%3Elock&h=linux-3.13.y
[1] http://kernel.ubuntu.com/git/ubuntu/linux.git/log/?showmsg=1&qt=grep&q=Avoid+taking+the+hb-%3Elock&h=linux-3.16.y
[2] https://git.kernel.org/cgit/linux/kernel/git/stable/linux-stable.git/tree/kernel/futex.c?h=linux-3.2.y&id=refs/tags/v3.2.69
[3] http://kernel.ubuntu.com/git/ubuntu/ubuntu-precise.git/tree/kernel/futex.c#n186

Total time: 33.624 secs
Total time: 25.215 secs
Total time: 25.204 secs
Total time: 27.108 secs
Total time: 25.532 secs
Total time: 27.042 secs
Total time: 26.061 secs
Total time: 26.199 secs
Total time: 26.177 secs
Total time: 26.294 secs
Total time: 25.974 secs
Total time: 26.123 secs
Total time: 25.983 secs
Total time: 26.475 secs
Total time: 26.321 secs
Total time: 26.268 secs
Total time: 26.459 secs
Total time: 27.076 secs
rx.internal.operators.OperatorRetryTest > testRetryWithBackpressureParallel FAILED
    java.lang.AssertionError: Data content mismatch: 118={beginningEveryTime x 128}
        at org.junit.Assert.fail(Assert.java:93)
        at rx.internal.operators.OperatorRetryTest.testRetryWithBackpressureParallel(OperatorRetryTest.java:768)

@akarnokd
Copy link
Member

I suggest closing this PR and start with a fresh new PR and perhaps a new discussion.

@davidmoten
Copy link
Collaborator Author

Continuing issue in #2997.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants