Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Flowable.blockingSubscribe is unbounded and can lead to OOME #6026

Conversation

RomanWuattier
Copy link
Contributor

Create and bound new blockingSubscribe overloads to bufferSize.

  • Create new overloads with bufferSize
  • Create a boundedConsumer
  • Create a BoundedSubsciber

Close: #5988

@codecov
Copy link

codecov bot commented May 29, 2018

Codecov Report

Merging #6026 into 2.x will decrease coverage by 0.03%.
The diff coverage is 94.44%.

Impacted file tree graph

@@             Coverage Diff             @@
##               2.x    #6026      +/-   ##
===========================================
- Coverage     98.3%   98.27%   -0.04%     
- Complexity    6175     6193      +18     
===========================================
  Files          665      666       +1     
  Lines        44729    44801      +72     
  Branches      6205     6206       +1     
===========================================
+ Hits         43973    44030      +57     
- Misses         222      233      +11     
- Partials       534      538       +4
Impacted Files Coverage Δ Complexity Δ
...ava/io/reactivex/internal/functions/Functions.java 100% <100%> (ø) 36 <1> (+1) ⬆️
src/main/java/io/reactivex/Flowable.java 100% <100%> (ø) 564 <3> (+3) ⬆️
.../operators/flowable/FlowableBlockingSubscribe.java 93.02% <100%> (-4.28%) 10 <1> (ø)
...ctivex/internal/subscribers/BoundedSubscriber.java 92.59% <92.59%> (ø) 15 <15> (?)
.../operators/observable/ObservableFlatMapSingle.java 88.8% <0%> (-5.98%) 2% <0%> (ø)
...ternal/operators/observable/ObservablePublish.java 96.46% <0%> (-3.54%) 11% <0%> (ø)
...a/io/reactivex/internal/util/QueueDrainHelper.java 97.22% <0%> (-2.78%) 56% <0%> (-2%)
...tivex/internal/schedulers/TrampolineScheduler.java 96.1% <0%> (-2.6%) 6% <0%> (ø)
...activex/internal/observers/QueueDrainObserver.java 97.43% <0%> (-2.57%) 21% <0%> (-1%)
... and 30 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update f3c8862...8bd5b13. Read the comment docs.

Copy link
Member

@akarnokd akarnokd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some small missing pieces and one logical error.

* </dl>
* @param onNext the callback action for each source value
* @param bufferSize the size of the buffer
* @since 2.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please designate as 2.1.15 - experimental here and to the other methods.

*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe(Consumer<? super T> onNext, int bufferSize) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add @Experimental annotation to these new methods.

final Action onComplete;
final Consumer<? super Subscription> onSubscribe;

private int bufferSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please no private instance fields.

final Consumer<? super Subscription> onSubscribe;

private int bufferSize;
private final AtomicInteger internalBuffer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this atomic?

if (internalBuffer.getAndIncrement() < bufferSize) {
onNext.accept(t);
}
if (internalBuffer.get() == bufferSize) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would you end the flow when the bufferSize number of elements have been received?


assertEquals(Arrays.asList(1, 2, 3, 100), list);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an unit test that transfers 1 million items.


@Override
public void onComplete() {
if (get() != SubscriptionHelper.CANCELLED) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are going to check the value of get() may as well do a compareAndSet loop for concurrent cancellation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't. That level of precision it is not really worth it here.

onNext.accept(t);

int c = consumed + 1;
if (c == bufferSize) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to use a low-watermark than requesting only when the entire buffer has been consumed. Define a final int limit field, in the constructor set it as this.limit = bufferSize - (bufferSize >> 2);, then compare and request as limit.

Copy link
Member

@akarnokd akarnokd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One algorithmic adjustment and good to go.

@akarnokd akarnokd dismissed their stale review May 30, 2018 10:50

outdated

@akarnokd
Copy link
Member

Also it would be great if you copied the unit tests that crash the callbacks so that the coverage on the new class gets very high.

@akarnokd akarnokd added the 2.x label May 30, 2018
@akarnokd akarnokd added this to the 2.2 milestone May 30, 2018
@RomanWuattier RomanWuattier force-pushed the fix-unbounded-flowable-blocking-subscribe branch 9 times, most recently from 62a9816 to 74e7cde Compare May 30, 2018 21:06
@RomanWuattier
Copy link
Contributor Author

I created a BoundedSubscriberTest.java to trigger Consumer and Action exceptions, but the code coverage remains unchanged. Did I miss a test?

* @param bufferSize the number of elements to prefetch from the source Publisher
* @param <T> the value type
*/
@SuppressWarnings("ResultOfMethodCallIgnored")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed. It never appears anywhere else in RxJava. Please remove.

this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
this.bufferSize = ((Functions.BoundedConsumer) onSubscribe).getBufferSize();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this cast needed? Can't you could simply pass the bufferSize into the constructor?

}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
throw new TestException("Inner");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Place a breakpoint here and debug the test. It should reveal why the code coverage doesn't seem to execute that code in BoundedSubscriber.

@@ -1,11 +1,11 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* <p>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't change the license headers.

@@ -0,0 +1,388 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
* <p>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't change the license headers.

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.testng.annotations.Test;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use org.junit.Test as the TestNG tests are not part of the code coverage.

Copy link
Member

@akarnokd akarnokd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix headers and use org.junit.Test please.

@RomanWuattier RomanWuattier force-pushed the fix-unbounded-flowable-blocking-subscribe branch from db252ee to 8bd5b13 Compare June 2, 2018 14:49
@akarnokd akarnokd merged commit fc0ca6e into ReactiveX:2.x Jun 14, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants