Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Flowable.blockingSubscribe is unbounded and can lead to OOME #6026

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5853,6 +5853,38 @@ public final void blockingSubscribe(Consumer<? super T> onNext) {
FlowableBlockingSubscribe.subscribe(this, onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}

/**
* Subscribes to the source and calls the given callbacks <strong>on the current thread</strong>.
* <p>
* If the Flowable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* Using the overloads {@link #blockingSubscribe(Consumer, Consumer)}
* or {@link #blockingSubscribe(Consumer, Consumer, Action)} instead is recommended.
* <p>
* Note that calling this method will block the caller thread until the upstream terminates
* normally or with an error. Therefore, calling this method from special threads such as the
* Android Main Thread or the Swing Event Dispatch Thread is not recommended.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Flowable} in an bounded manner (up to bufferSize
* outstanding request amount for items).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onNext the callback action for each source value
* @param bufferSize the size of the buffer
* @since 2.1.15 - experimental
* @see #blockingSubscribe(Consumer, Consumer)
* @see #blockingSubscribe(Consumer, Consumer, Action)
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final void blockingSubscribe(Consumer<? super T> onNext, int bufferSize) {
FlowableBlockingSubscribe.subscribe(this, onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, bufferSize);
}

/**
* Subscribes to the source and calls the given callbacks <strong>on the current thread</strong>.
* <p>
Expand All @@ -5877,6 +5909,32 @@ public final void blockingSubscribe(Consumer<? super T> onNext, Consumer<? super
FlowableBlockingSubscribe.subscribe(this, onNext, onError, Functions.EMPTY_ACTION);
}

/**
* Subscribes to the source and calls the given callbacks <strong>on the current thread</strong>.
* <p>
* Note that calling this method will block the caller thread until the upstream terminates
* normally or with an error. Therefore, calling this method from special threads such as the
* Android Main Thread or the Swing Event Dispatch Thread is not recommended.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Flowable} in an bounded manner (up to bufferSize
* outstanding request amount for items).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onNext the callback action for each source value
* @param onError the callback action for an error event
* @param bufferSize the size of the buffer
* @since 2.1.15 - experimental
* @see #blockingSubscribe(Consumer, Consumer, Action)
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final void blockingSubscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
int bufferSize) {
FlowableBlockingSubscribe.subscribe(this, onNext, onError, Functions.EMPTY_ACTION, bufferSize);
}

/**
* Subscribes to the source and calls the given callbacks <strong>on the current thread</strong>.
Expand All @@ -5902,6 +5960,33 @@ public final void blockingSubscribe(Consumer<? super T> onNext, Consumer<? super
FlowableBlockingSubscribe.subscribe(this, onNext, onError, onComplete);
}

/**
* Subscribes to the source and calls the given callbacks <strong>on the current thread</strong>.
* <p>
* Note that calling this method will block the caller thread until the upstream terminates
* normally or with an error. Therefore, calling this method from special threads such as the
* Android Main Thread or the Swing Event Dispatch Thread is not recommended.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Flowable} in an bounded manner (up to bufferSize
* outstanding request amount for items).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onNext the callback action for each source value
* @param onError the callback action for an error event
* @param onComplete the callback action for the completion event.
* @param bufferSize the size of the buffer
* @since 2.1.15 - experimental
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final void blockingSubscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete,
int bufferSize) {
FlowableBlockingSubscribe.subscribe(this, onNext, onError, onComplete, bufferSize);
}

/**
* Subscribes to the source and calls the {@link Subscriber} methods <strong>on the current thread</strong>.
* <p>
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/io/reactivex/internal/functions/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -745,4 +745,23 @@ public void accept(Subscription t) throws Exception {
t.request(Long.MAX_VALUE);
}
}

@SuppressWarnings("unchecked")
public static <T> Consumer<T> boundedConsumer(int bufferSize) {
return (Consumer<T>) new BoundedConsumer(bufferSize);
}

public static class BoundedConsumer implements Consumer<Subscription> {

final int bufferSize;

BoundedConsumer(int bufferSize) {
this.bufferSize = bufferSize;
}

@Override
public void accept(Subscription s) throws Exception {
s.request(bufferSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,23 @@ public static <T> void subscribe(Publisher<? extends T> o, final Consumer<? supe
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
subscribe(o, new LambdaSubscriber<T>(onNext, onError, onComplete, Functions.REQUEST_MAX));
}

/**
* Subscribes to the source and calls the given actions on the current thread.
* @param o the source publisher
* @param onNext the callback action for each source value
* @param onError the callback action for an error event
* @param onComplete the callback action for the completion event.
* @param bufferSize the number of elements to prefetch from the source Publisher
* @param <T> the value type
*/
public static <T> void subscribe(Publisher<? extends T> o, final Consumer<? super T> onNext,
final Consumer<? super Throwable> onError, final Action onComplete, int bufferSize) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.verifyPositive(bufferSize, "number > 0 required");
subscribe(o, new BoundedSubscriber<T>(onNext, onError, onComplete, Functions.boundedConsumer(bufferSize),
bufferSize));
}
}
140 changes: 140 additions & 0 deletions src/main/java/io/reactivex/internal/subscribers/BoundedSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.subscribers;

import io.reactivex.FlowableSubscriber;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.CompositeException;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.observers.LambdaConsumerIntrospection;
import io.reactivex.plugins.RxJavaPlugins;
import org.reactivestreams.Subscription;

import java.util.concurrent.atomic.AtomicReference;

public final class BoundedSubscriber<T> extends AtomicReference<Subscription>
implements FlowableSubscriber<T>, Subscription, Disposable, LambdaConsumerIntrospection {

private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Subscription> onSubscribe;

final int bufferSize;
int consumed;
final int limit;

public BoundedSubscriber(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Subscription> onSubscribe, int bufferSize) {
super();
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
this.bufferSize = bufferSize;
this.limit = bufferSize - (bufferSize >> 2);
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.setOnce(this, s)) {
try {
onSubscribe.accept(this);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.cancel();
onError(e);
}
}
}

@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
onNext.accept(t);

int c = consumed + 1;
if (c == limit) {
consumed = 0;
get().request(limit);
} else {
consumed = c;
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().cancel();
onError(e);
}
}
}

@Override
public void onError(Throwable t) {
if (get() != SubscriptionHelper.CANCELLED) {
lazySet(SubscriptionHelper.CANCELLED);
try {
onError.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
} else {
RxJavaPlugins.onError(t);
}
}

@Override
public void onComplete() {
if (get() != SubscriptionHelper.CANCELLED) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are going to check the value of get() may as well do a compareAndSet loop for concurrent cancellation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't. That level of precision it is not really worth it here.

lazySet(SubscriptionHelper.CANCELLED);
try {
onComplete.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
}
}

@Override
public void dispose() {
cancel();
}

@Override
public boolean isDisposed() {
return get() == SubscriptionHelper.CANCELLED;
}

@Override
public void request(long n) {
get().request(n);
}

@Override
public void cancel() {
SubscriptionHelper.cancel(this);
}

@Override
public boolean hasCustomOnError() {
return onError != Functions.ON_ERROR_MISSING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public void flowableBlockingSubscribe1() {
.blockingSubscribe(Functions.emptyConsumer());
}

@Test
public void flowableBoundedBlockingSubscribe1() {
Flowable.error(new TestException())
.blockingSubscribe(Functions.emptyConsumer(), 128);
}

@Test
public void observableSubscribe0() {
Observable.error(new TestException())
Expand Down
Loading