Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Add onDropped callback to onBackpressureBuffer #7567

Merged
merged 3 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 95 additions & 3 deletions src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -12546,7 +12546,7 @@ public final Flowable<T> onBackpressureBuffer(int capacity, boolean delayError)
@NonNull
public final Flowable<T> onBackpressureBuffer(int capacity, boolean delayError, boolean unbounded) {
ObjectHelper.verifyPositive(capacity, "capacity");
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, Functions.EMPTY_ACTION));
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, Functions.EMPTY_ACTION, Functions.emptyConsumer()));
}

/**
Expand Down Expand Up @@ -12577,6 +12577,7 @@ public final Flowable<T> onBackpressureBuffer(int capacity, boolean delayError,
* @throws NullPointerException if {@code onOverflow} is {@code null}
* @throws IllegalArgumentException if {@code capacity} is non-positive
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
* @see #onBackpressureBuffer(int, boolean, boolean, Action, Consumer)
* @since 1.1.0
*/
@CheckReturnValue
Expand All @@ -12587,7 +12588,51 @@ public final Flowable<T> onBackpressureBuffer(int capacity, boolean delayError,
@NonNull Action onOverflow) {
Objects.requireNonNull(onOverflow, "onOverflow is null");
ObjectHelper.verifyPositive(capacity, "capacity");
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, onOverflow));
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, onOverflow, Functions.emptyConsumer()));
}

/**
* Buffers an optionally unlimited number of items from the current {@code Flowable} and allows it to emit as fast it can while allowing the
* downstream to consume the items at its own place.
* If {@code unbounded} is {@code true}, the resulting {@code Flowable} will signal a
* {@link MissingBackpressureException} via {@code onError} as soon as the buffer's capacity is exceeded, dropping all undelivered
* items, canceling the flow and calling the {@code onOverflow} action.
* <p>
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.buffer.v3.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the current {@code Flowable} in an unbounded
* manner (i.e., not applying backpressure to it).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param capacity number of slots available in the buffer.
* @param delayError
* if {@code true}, an exception from the current {@code Flowable} is delayed until all buffered elements have been
* consumed by the downstream; if {@code false}, an exception is immediately signaled to the downstream, skipping
* any buffered element
* @param unbounded
* if {@code true}, the capacity value is interpreted as the internal "island" size of the unbounded buffer
* @param onOverflow action to execute if an item needs to be buffered, but there are no available slots.
* @param onDropped the {@link Consumer} to be called with the item that could not be buffered due to capacity constraints.
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code onOverflow} or {@code onDropped} is {@code null}
* @throws IllegalArgumentException if {@code capacity} is non-positive
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
* @since 3.1.7
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final Flowable<T> onBackpressureBuffer(int capacity, boolean delayError, boolean unbounded,
@NonNull Action onOverflow, @NonNull Consumer<? super T> onDropped) {
Objects.requireNonNull(onOverflow, "onOverflow is null");
Objects.requireNonNull(onDropped, "onDropped is null");
ObjectHelper.verifyPositive(capacity, "capacity");
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, onOverflow, onDropped));
}

/**
Expand Down Expand Up @@ -12653,6 +12698,7 @@ public final Flowable<T> onBackpressureBuffer(int capacity, @NonNull Action onOv
* @throws NullPointerException if {@code onOverflow} or {@code overflowStrategy} is {@code null}
* @throws IllegalArgumentException if {@code capacity} is non-positive
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
* @see #onBackpressureBuffer(long, Action, BackpressureOverflowStrategy)
* @since 2.0
*/
@CheckReturnValue
Expand All @@ -12662,9 +12708,55 @@ public final Flowable<T> onBackpressureBuffer(int capacity, @NonNull Action onOv
public final Flowable<T> onBackpressureBuffer(long capacity, @Nullable Action onOverflow, @NonNull BackpressureOverflowStrategy overflowStrategy) {
Objects.requireNonNull(overflowStrategy, "overflowStrategy is null");
ObjectHelper.verifyPositive(capacity, "capacity");
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBufferStrategy<>(this, capacity, onOverflow, overflowStrategy));
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBufferStrategy<>(this, capacity, onOverflow, overflowStrategy, null));
}

/**
* Buffers an optionally unlimited number of items from the current {@code Flowable} and allows it to emit as fast it can while allowing the
* downstream to consume the items at its own place.
* The resulting {@code Flowable} will behave as determined by {@code overflowStrategy} if the buffer capacity is exceeded:
* <ul>
* <li>{@link BackpressureOverflowStrategy#ERROR} (default) will call {@code onError} dropping all undelivered items,
* canceling the source, and notifying the producer with {@code onOverflow}. </li>
* <li>{@link BackpressureOverflowStrategy#DROP_LATEST} will drop any new items emitted by the producer while
* the buffer is full, without generating any {@code onError}. Each drop will, however, invoke {@code onOverflow}
* to signal the overflow to the producer.</li>
* <li>{@link BackpressureOverflowStrategy#DROP_OLDEST} will drop the oldest items in the buffer in order to make
* room for newly emitted ones. Overflow will not generate an {@code onError}, but each drop will invoke
* {@code onOverflow} to signal the overflow to the producer.</li>
* </ul>
*
* <p>
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.buffer.v3.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the current {@code Flowable} in an unbounded
* manner (i.e., not applying backpressure to it).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param capacity number of slots available in the buffer.
* @param onOverflow action to execute if an item needs to be buffered, but there are no available slots, {@code null} is allowed.
* @param overflowStrategy how should the resulting {@code Flowable} react to buffer overflows, {@code null} is not allowed.
* @param onDropped the {@link Consumer} to be called with the item that could not be buffered due to capacity constraints.
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code onOverflow}, {@code overflowStrategy} or {@code onDropped} is {@code null}
* @throws IllegalArgumentException if {@code capacity} is non-positive
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
* @since 3.1.7
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final Flowable<T> onBackpressureBuffer(long capacity, @Nullable Action onOverflow, @NonNull BackpressureOverflowStrategy overflowStrategy, @NonNull Consumer<? super T> onDropped) {
Objects.requireNonNull(overflowStrategy, "overflowStrategy is null");
Objects.requireNonNull(onDropped, "onDropped is null");
ObjectHelper.verifyPositive(capacity, "capacity");
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBufferStrategy<>(this, capacity, onOverflow, overflowStrategy, onDropped));
}
/**
* Drops items from the current {@code Flowable} if the downstream is not ready to receive new items (indicated
* by a lack of {@link Subscription#request(long)} calls from it).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.reactivex.rxjava3.annotations.Nullable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.subscriptions.*;
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
import io.reactivex.rxjava3.operators.*;
Expand All @@ -30,19 +30,21 @@ public final class FlowableOnBackpressureBuffer<T> extends AbstractFlowableWithU
final boolean unbounded;
final boolean delayError;
final Action onOverflow;
final Consumer<? super T> onDropped;

public FlowableOnBackpressureBuffer(Flowable<T> source, int bufferSize, boolean unbounded,
boolean delayError, Action onOverflow) {
boolean delayError, Action onOverflow, Consumer<? super T> onDropped) {
super(source);
this.bufferSize = bufferSize;
this.unbounded = unbounded;
this.delayError = delayError;
this.onOverflow = onOverflow;
this.onDropped = onDropped;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new BackpressureBufferSubscriber<>(s, bufferSize, unbounded, delayError, onOverflow));
source.subscribe(new BackpressureBufferSubscriber<>(s, bufferSize, unbounded, delayError, onOverflow, onDropped));
}

static final class BackpressureBufferSubscriber<T> extends BasicIntQueueSubscription<T> implements FlowableSubscriber<T> {
Expand All @@ -53,6 +55,7 @@ static final class BackpressureBufferSubscriber<T> extends BasicIntQueueSubscrip
final SimplePlainQueue<T> queue;
final boolean delayError;
final Action onOverflow;
final Consumer<? super T> onDropped;

Subscription upstream;

Expand All @@ -66,10 +69,11 @@ static final class BackpressureBufferSubscriber<T> extends BasicIntQueueSubscrip
boolean outputFused;

BackpressureBufferSubscriber(Subscriber<? super T> actual, int bufferSize,
boolean unbounded, boolean delayError, Action onOverflow) {
boolean unbounded, boolean delayError, Action onOverflow, Consumer<? super T> onDropped) {
this.downstream = actual;
this.onOverflow = onOverflow;
this.delayError = delayError;
this.onDropped = onDropped;

SimplePlainQueue<T> q;

Expand Down Expand Up @@ -98,6 +102,7 @@ public void onNext(T t) {
MissingBackpressureException ex = new MissingBackpressureException("Buffer is full");
try {
onOverflow.run();
onDropped.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
ex.initCause(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Expand All @@ -38,17 +38,21 @@ public final class FlowableOnBackpressureBufferStrategy<T> extends AbstractFlowa

final BackpressureOverflowStrategy strategy;

final Consumer<? super T> onDropped;

public FlowableOnBackpressureBufferStrategy(Flowable<T> source,
long bufferSize, Action onOverflow, BackpressureOverflowStrategy strategy) {
long bufferSize, Action onOverflow, BackpressureOverflowStrategy strategy,
Consumer<? super T> onDropped) {
super(source);
this.bufferSize = bufferSize;
this.onOverflow = onOverflow;
this.strategy = strategy;
this.onDropped = onDropped;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new OnBackpressureBufferStrategySubscriber<>(s, onOverflow, strategy, bufferSize));
source.subscribe(new OnBackpressureBufferStrategySubscriber<>(s, onOverflow, strategy, bufferSize, onDropped));
}

static final class OnBackpressureBufferStrategySubscriber<T>
Expand All @@ -61,6 +65,8 @@ static final class OnBackpressureBufferStrategySubscriber<T>

final Action onOverflow;

final Consumer<? super T> onDropped;

final BackpressureOverflowStrategy strategy;

final long bufferSize;
Expand All @@ -77,13 +83,15 @@ static final class OnBackpressureBufferStrategySubscriber<T>
Throwable error;

OnBackpressureBufferStrategySubscriber(Subscriber<? super T> actual, Action onOverflow,
BackpressureOverflowStrategy strategy, long bufferSize) {
BackpressureOverflowStrategy strategy, long bufferSize,
Consumer<? super T> onDropped) {
this.downstream = actual;
this.onOverflow = onOverflow;
this.strategy = strategy;
this.bufferSize = bufferSize;
this.requested = new AtomicLong();
this.deque = new ArrayDeque<>();
this.onDropped = onDropped;
}

@Override
Expand All @@ -104,44 +112,60 @@ public void onNext(T t) {
}
boolean callOnOverflow = false;
boolean callError = false;
boolean callDrain = false;
Deque<T> dq = deque;
T toDrop = null;
synchronized (dq) {
if (dq.size() == bufferSize) {
switch (strategy) {
case DROP_LATEST:
dq.pollLast();
toDrop = dq.pollLast();
dq.offer(t);
callOnOverflow = true;
break;
case DROP_OLDEST:
dq.poll();
toDrop = dq.poll();
dq.offer(t);
callOnOverflow = true;
break;
default:
// signal error
toDrop = t;
callError = true;
break;
}
} else {
dq.offer(t);
callDrain = true;
}
}

if (callOnOverflow) {
if (onOverflow != null) {
try {
onOverflow.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.cancel();
onError(ex);
}
if (callOnOverflow && onOverflow != null) {
try {
onOverflow.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.cancel();
onError(ex);
}
}

if (onDropped != null && toDrop != null) {
try {
onDropped.accept(toDrop);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.cancel();
onError(ex);
}
} else if (callError) {
}

if (callError) {
upstream.cancel();
onError(MissingBackpressureException.createDefault());
} else {
}

if (callDrain) {
drain();
}
}
Expand Down
Loading