Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Null check for BufferExactBoundedObserver #6499

Merged
merged 3 commits into from
Jun 17, 2019
Merged

Conversation

JanKn
Copy link

@JanKn JanKn commented Jun 12, 2019

  • [*] Please give a description about what and why you are contributing, even if it's trivial:

Improve stability of the library.
Other variants of the onComplete method include this Null check already, e.g. BufferExactUnboundedObserver, this check should fix the case when there is a race condition and buffer is already set to "null" by the time onComplete is called.

It is causing 0.1% crashes in our production app, this should improve stability of other apps too.

Other variants contain this Null check already, e.g. BufferExactUnboundedObserver
It is causing 0.1% crashes in our production app.
@akarnokd
Copy link
Member

I don't see how that scenario would be possible with just the exact buffer. Only the terminal events can set buffer to null and run is protected against a null buffer.
Aren't you using PublishSubject as well from multiple threads, competing for termination?

@akarnokd akarnokd added the 2.x label Jun 12, 2019
@akarnokd
Copy link
Member

Also an unit test demonstrating the presumed bug has been fixed is required.

@codecov
Copy link

codecov bot commented Jun 12, 2019

Codecov Report

Merging #6499 into 2.x will increase coverage by <.01%.
The diff coverage is 100%.

Impacted file tree graph

@@             Coverage Diff              @@
##                2.x    #6499      +/-   ##
============================================
+ Coverage     98.27%   98.28%   +<.01%     
- Complexity     6291     6298       +7     
============================================
  Files           675      675              
  Lines         45163    45165       +2     
  Branches       6244     6246       +2     
============================================
+ Hits          44386    44390       +4     
- Misses          241      243       +2     
+ Partials        536      532       -4
Impacted Files Coverage Δ Complexity Δ
...al/operators/observable/ObservableBufferTimed.java 99.62% <100%> (ø) 5 <0> (ø) ⬇️
...ternal/operators/flowable/FlowableBufferTimed.java 99.27% <100%> (ø) 5 <0> (ø) ⬇️
...l/operators/observable/ObservableFlatMapMaybe.java 88.23% <0%> (-5.23%) 2% <0%> (ø)
...ternal/operators/observable/ObservablePublish.java 94.69% <0%> (-2.66%) 11% <0%> (+1%)
...ernal/operators/flowable/FlowableFromIterable.java 95.18% <0%> (-2.14%) 5% <0%> (ø)
...x/internal/operators/flowable/FlowablePublish.java 96.2% <0%> (-2.11%) 11% <0%> (ø)
.../io/reactivex/disposables/CompositeDisposable.java 98.14% <0%> (-1.86%) 39% <0%> (-1%)
...tivex/internal/operators/single/SingleTimeout.java 98.33% <0%> (-1.67%) 2% <0%> (ø)
...internal/operators/flowable/FlowableSwitchMap.java 95.28% <0%> (-1.42%) 3% <0%> (ø)
...ternal/operators/completable/CompletableMerge.java 97.61% <0%> (-1.2%) 2% <0%> (ø)
... and 26 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 15e52bb...af7c5ea. Read the comment docs.

@JanKn
Copy link
Author

JanKn commented Jun 12, 2019

To be honest, I am trying to simulate the scenario past day and a half.
Being it likely a race condition, it makes it difficult.
I will give Unit test a shot, if you have a sample Test for some other operator, which is testing similar case, I would appreaciate if you can point me there, thanks.

Our app uses some PublishSubjects, but we switched mostly to PublishRelay. The scenario with disposing from multiple threads is possible.
The production StackTrace we are seeing is this:

Fatal Exception: java.lang.NullPointerException: Null is not a valid element
       at io.reactivex.internal.queue.MpscLinkedQueue.offer(MpscLinkedQueue.java:60)
       at io.reactivex.internal.operators.observable.ObservableBufferTimed$BufferExactBoundedObserver.onComplete(ObservableBufferTimed.java:507)
       at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drainLoop(ObservableFlatMap.java:371)
       at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drain(ObservableFlatMap.java:326)
       at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onComplete(ObservableFlatMap.java:303)
       at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onComplete(BodyObservable.java:66)
       at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:51)
       at io.reactivex.Observable.subscribe(Observable.java:12268)
       at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
       at io.reactivex.Observable.subscribe(Observable.java:12268)
       at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55)
       at io.reactivex.Observable.subscribe(Observable.java:12268)
       at io.reactivex.internal.operators.observable.ObservableBufferTimed.subscribeActual(ObservableBufferTimed.java:66)
       at io.reactivex.Observable.subscribe(Observable.java:12268)
       at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
       at io.reactivex.Observable.subscribe(Observable.java:12268)
       at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
       at io.reactivex.Observable.subscribe(Observable.java:12268)
       at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
       at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
       at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
       at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:301)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
       at java.lang.Thread.run(Thread.java:784)

@akarnokd
Copy link
Member

That code path should not lead to NPE. The only way for buffer to remain null is that the supplier fails, but then you should get an error signal. Otherwise, calling onComplete twice would result in NPE. The exact-boundary doesn't release the buffer upon disposing it, unlike the other implementation in that file.

@JanKn
Copy link
Author

JanKn commented Jun 12, 2019

I agree that it can be something wrong with the supplier.
Another example: If supplier breaches the contract and calls onError (buffer is set to null) and then onComplete, which it should not, null gets into MprcLinkedQueue.offer:

...
    public boolean offer(final T e) {
        if (null == e) {
            throw new NullPointerException("Null is not a valid element");
        }
...  

It does not happen super often given the size of the app, but it is still a concern.
Will it be something we can merge?

If so, I can push the test which covers this (fails before change, passes after change):

 @Test
    public void bufferExactIssueXXX() {
        Scheduler.Worker w = Schedulers.io().createWorker();
        TestObserver<List<Integer>> observer = new TestObserver<List<Integer>>();
        BufferExactBoundedObserver<Integer, List<Integer>> buf = new BufferExactBoundedObserver<Integer, List<Integer>>(observer, ArrayListSupplier.<Integer>asCallable(), 100, TimeUnit.MILLISECONDS, 10, false, w);

        buf.onError(new Throwable());
        buf.onComplete();
    }

@akarnokd
Copy link
Member

In that case, use a failing supplier with an empty source. No threading necessary. Also the Flowable variant may be prone to the same issue. Could you check and fix that too?

…nit test

FlowableBufferTimed.BufferExactBoundedSubscriber - failing supplier fix + unit test
@JanKn
Copy link
Author

JanKn commented Jun 13, 2019

Hi,
thank you for the test sample, that helped a lot, I just had to hit the right condition.
I think we can merge this change now.
When do you expect a release containing this?

@akarnokd
Copy link
Member

I usually wait for @vanniktech's review, then if you think this is urgent, we can release soon after.

@akarnokd akarnokd added the Bug label Jun 17, 2019
@akarnokd akarnokd added this to the 2.2 backlog milestone Jun 17, 2019
@akarnokd akarnokd merged commit c5e1b3a into ReactiveX:2.x Jun 17, 2019
Copy link
Collaborator

@vanniktech vanniktech left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whops. Totally lost track of this! LGTM!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants