Skip to content

Commit

Permalink
Fix Flowable.blockingSubscribe is unbounded and can lead to OOME (Rea…
Browse files Browse the repository at this point in the history
…ctiveX#5988)

Create BoundedSubscriber
  • Loading branch information
RomanWuattier committed May 29, 2018
1 parent 5f1ce20 commit f56c4ed
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 0 deletions.
82 changes: 82 additions & 0 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5853,6 +5853,37 @@ public final void blockingSubscribe(Consumer<? super T> onNext) {
FlowableBlockingSubscribe.subscribe(this, onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}

/**
* Subscribes to the source and calls the given callbacks <strong>on the current thread</strong>.
* <p>
* If the Flowable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* Using the overloads {@link #blockingSubscribe(Consumer, Consumer)}
* or {@link #blockingSubscribe(Consumer, Consumer, Action)} instead is recommended.
* <p>
* Note that calling this method will block the caller thread until the upstream terminates
* normally or with an error. Therefore, calling this method from special threads such as the
* Android Main Thread or the Swing Event Dispatch Thread is not recommended.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Flowable} in an bounded manner (up to bufferSize
* outstanding request amount for items).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onNext the callback action for each source value
* @param bufferSize the size of the buffer
* @since 2.0
* @see #blockingSubscribe(Consumer, Consumer)
* @see #blockingSubscribe(Consumer, Consumer, Action)
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe(Consumer<? super T> onNext, int bufferSize) {
FlowableBlockingSubscribe.subscribe(this, onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, bufferSize);
}

/**
* Subscribes to the source and calls the given callbacks <strong>on the current thread</strong>.
* <p>
Expand All @@ -5877,6 +5908,31 @@ public final void blockingSubscribe(Consumer<? super T> onNext, Consumer<? super
FlowableBlockingSubscribe.subscribe(this, onNext, onError, Functions.EMPTY_ACTION);
}

/**
* Subscribes to the source and calls the given callbacks <strong>on the current thread</strong>.
* <p>
* Note that calling this method will block the caller thread until the upstream terminates
* normally or with an error. Therefore, calling this method from special threads such as the
* Android Main Thread or the Swing Event Dispatch Thread is not recommended.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Flowable} in an bounded manner (up to bufferSize
* outstanding request amount for items).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onNext the callback action for each source value
* @param onError the callback action for an error event
* @param bufferSize the size of the buffer
* @since 2.0
* @see #blockingSubscribe(Consumer, Consumer, Action)
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
int bufferSize) {
FlowableBlockingSubscribe.subscribe(this, onNext, onError, Functions.EMPTY_ACTION, bufferSize);
}

/**
* Subscribes to the source and calls the given callbacks <strong>on the current thread</strong>.
Expand All @@ -5902,6 +5958,32 @@ public final void blockingSubscribe(Consumer<? super T> onNext, Consumer<? super
FlowableBlockingSubscribe.subscribe(this, onNext, onError, onComplete);
}

/**
* Subscribes to the source and calls the given callbacks <strong>on the current thread</strong>.
* <p>
* Note that calling this method will block the caller thread until the upstream terminates
* normally or with an error. Therefore, calling this method from special threads such as the
* Android Main Thread or the Swing Event Dispatch Thread is not recommended.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Flowable} in an bounded manner (up to bufferSize
* outstanding request amount for items).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onNext the callback action for each source value
* @param onError the callback action for an error event
* @param onComplete the callback action for the completion event.
* @param bufferSize the size of the buffer
* @since 2.0
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete,
int bufferSize) {
FlowableBlockingSubscribe.subscribe(this, onNext, onError, onComplete, bufferSize);
}

/**
* Subscribes to the source and calls the {@link Subscriber} methods <strong>on the current thread</strong>.
* <p>
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/io/reactivex/internal/functions/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -745,4 +745,27 @@ public void accept(Subscription t) throws Exception {
t.request(Long.MAX_VALUE);
}
}

@SuppressWarnings("unchecked")
public static <T> Consumer<T> boundedConsumer(int bufferSize) {
return (Consumer<T>) new BoundedConsumer(bufferSize);
}

public static class BoundedConsumer implements Consumer<Subscription> {

private final int bufferSize;

BoundedConsumer(int bufferSize) {
this.bufferSize = bufferSize;
}

@Override
public void accept(Subscription s) throws Exception {
s.request(bufferSize);
}

public int getBufferSize() {
return bufferSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,23 @@ public static <T> void subscribe(Publisher<? extends T> o, final Consumer<? supe
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
subscribe(o, new LambdaSubscriber<T>(onNext, onError, onComplete, Functions.REQUEST_MAX));
}

/**
* Subscribes to the source and calls the given actions on the current thread.
* @param o the source publisher
* @param onNext the callback action for each source value
* @param onError the callback action for an error event
* @param onComplete the callback action for the completion event.
* @param bufferSize the number of elements to prefetch from the source Publisher
* @param <T> the value type
*/
@SuppressWarnings("ResultOfMethodCallIgnored")
public static <T> void subscribe(Publisher<? extends T> o, final Consumer<? super T> onNext,
final Consumer<? super Throwable> onError, final Action onComplete, int bufferSize) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.verifyPositive(bufferSize, "number > 0 required");
subscribe(o, new BoundedSubscriber<T>(onNext, onError, onComplete, Functions.boundedConsumer(bufferSize)));
}
}
126 changes: 126 additions & 0 deletions src/main/java/io/reactivex/internal/subscribers/BoundedSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package io.reactivex.internal.subscribers;

import io.reactivex.FlowableSubscriber;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.CompositeException;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.observers.LambdaConsumerIntrospection;
import io.reactivex.plugins.RxJavaPlugins;
import org.reactivestreams.Subscription;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public final class BoundedSubscriber<T> extends AtomicReference<Subscription>
implements FlowableSubscriber<T>, Subscription, Disposable, LambdaConsumerIntrospection {

private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Subscription> onSubscribe;

private int bufferSize;
private final AtomicInteger internalBuffer;

public BoundedSubscriber(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Subscription> onSubscribe) {
super();
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
this.bufferSize = ((Functions.BoundedConsumer) onSubscribe).getBufferSize();
this.internalBuffer = new AtomicInteger(0);
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.setOnce(this, s)) {
try {
onSubscribe.accept(this);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.cancel();
onError(e);
}
}
}

@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
if (internalBuffer.getAndIncrement() < bufferSize) {
onNext.accept(t);
}
if (internalBuffer.get() == bufferSize) {
onComplete.run();
get().cancel();
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().cancel();
onError(e);
}
}
}

@Override
public void onError(Throwable t) {
if (get() != SubscriptionHelper.CANCELLED) {
lazySet(SubscriptionHelper.CANCELLED);
try {
onError.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
} else {
RxJavaPlugins.onError(t);
}
}

@Override
public void onComplete() {
if (get() != SubscriptionHelper.CANCELLED) {
lazySet(SubscriptionHelper.CANCELLED);
try {
onComplete.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
}
}

@Override
public void dispose() {
cancel();
}

@Override
public boolean isDisposed() {
return get() == SubscriptionHelper.CANCELLED;
}

@Override
public void request(long n) {
get().request(n);
}

@Override
public void cancel() {
SubscriptionHelper.cancel(this);
}

@Override
public boolean hasCustomOnError() {
return onError != Functions.ON_ERROR_MISSING;
}
}
Loading

0 comments on commit f56c4ed

Please sign in to comment.