diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index b7781c57f8..d9cea6954f 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -8970,13 +8970,15 @@ public final Flowable debounce(long timeout, @NonNull TimeUnit unit, @NonNull * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null} * @see ReactiveX operators documentation: Debounce * @see RxJava wiki: Backpressure - * @see #throttleWithTimeout(long, TimeUnit, Scheduler) + * @see #throttleWithTimeout(long, TimeUnit, Scheduler, Consumer) + * @since 3.1.6 - Experimental */ @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.ERROR) @SchedulerSupport(SchedulerSupport.CUSTOM) - public final Flowable debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer onDropped) { + @Experimental + public final Flowable debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer onDropped) { Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); Objects.requireNonNull(onDropped, "onDropped is null"); @@ -17640,7 +17642,7 @@ public final Flowable throttleWithTimeout(long timeout, @NonNull TimeUnit uni /** * Returns a {@code Flowable} that mirrors the current {@code Flowable}, except that it drops items emitted by the * current {@code Flowable} that are followed by newer items before a timeout value expires on a specified - * {@link Scheduler}. The timer resets on each emission (alias to {@link #debounce(long, TimeUnit, Scheduler)}). + * {@link Scheduler}. The timer resets on each emission (alias to {@link #debounce(long, TimeUnit, Scheduler, Consumer)}). *

* Note: If items keep being emitted by the current {@code Flowable} faster than the timeout then no items * will be emitted by the resulting {@code Flowable}. @@ -17668,13 +17670,15 @@ public final Flowable throttleWithTimeout(long timeout, @NonNull TimeUnit uni * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null} * @see ReactiveX operators documentation: Debounce * @see RxJava wiki: Backpressure - * @see #debounce(long, TimeUnit, Scheduler) + * @see #debounce(long, TimeUnit, Scheduler, Consumer) + * @since 3.1.6 - Experimental */ @CheckReturnValue @BackpressureSupport(BackpressureKind.ERROR) @SchedulerSupport(SchedulerSupport.CUSTOM) @NonNull - public final Flowable throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer onDropped) { + @Experimental + public final Flowable throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer onDropped) { return debounce(timeout, unit, scheduler, onDropped); } diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java index 5bb6508994..792b740104 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Observable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java @@ -7933,12 +7933,14 @@ public final Observable debounce(long timeout, @NonNull TimeUnit unit, @NonNu * @return the new {@code Observable} instance * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} } or {@code onDropped} is {@code null} * @see ReactiveX operators documentation: Debounce - * @see #throttleWithTimeout(long, TimeUnit, Scheduler) + * @see #throttleWithTimeout(long, TimeUnit, Scheduler, Consumer) + * @since 3.1.6 - Experimental */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) @NonNull - public final Observable debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer onDropped) { + @Experimental + public final Observable debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer onDropped) { Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); Objects.requireNonNull(onDropped, "onDropped is null"); @@ -14671,12 +14673,14 @@ public final Observable throttleWithTimeout(long timeout, @NonNull TimeUnit u * @return the new {@code Observable} instance * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null} * @see ReactiveX operators documentation: Debounce - * @see #debounce(long, TimeUnit, Scheduler) + * @see #debounce(long, TimeUnit, Scheduler, Consumer) + * @since 3.1.6 - Experimental */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) @NonNull - public final Observable throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer onDropped) { + @Experimental + public final Observable throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer onDropped) { return debounce(timeout, unit, scheduler, onDropped); } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTimed.java index 086dd6c1fd..b9e5dff7f3 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTimed.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTimed.java @@ -34,9 +34,9 @@ public final class FlowableDebounceTimed extends AbstractFlowableWithUpstream final long timeout; final TimeUnit unit; final Scheduler scheduler; - final Consumer onDropped; + final Consumer onDropped; - public FlowableDebounceTimed(Flowable source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer onDropped) { + public FlowableDebounceTimed(Flowable source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer onDropped) { super(source); this.timeout = timeout; this.unit = unit; @@ -58,7 +58,7 @@ static final class DebounceTimedSubscriber extends AtomicLong final long timeout; final TimeUnit unit; final Scheduler.Worker worker; - final Consumer onDropped; + final Consumer onDropped; Subscription upstream; @@ -68,7 +68,7 @@ static final class DebounceTimedSubscriber extends AtomicLong boolean done; - DebounceTimedSubscriber(Subscriber actual, long timeout, TimeUnit unit, Worker worker, Consumer onDropped) { + DebounceTimedSubscriber(Subscriber actual, long timeout, TimeUnit unit, Worker worker, Consumer onDropped) { this.downstream = actual; this.timeout = timeout; this.unit = unit; @@ -93,14 +93,14 @@ public void onNext(T t) { long idx = index + 1; index = idx; - Disposable d = timer; - if (d != null) { - d.dispose(); + DebounceEmitter currentEmitter = timer; + if (currentEmitter != null) { + currentEmitter.dispose(); } - if (onDropped != null && timer != null) { + if (onDropped != null && currentEmitter != null) { try { - onDropped.accept(timer.value); + onDropped.accept(currentEmitter.value); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); upstream.cancel(); @@ -110,10 +110,9 @@ public void onNext(T t) { } } - DebounceEmitter de = new DebounceEmitter<>(t, idx, this); - timer = de; - d = worker.schedule(de, timeout, unit); - de.setResource(d); + DebounceEmitter newEmitter = new DebounceEmitter<>(t, idx, this); + timer = newEmitter; + newEmitter.setResource(worker.schedule(newEmitter, timeout, unit)); } @Override diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDebounceTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDebounceTimed.java index 48861a778e..f2db191229 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDebounceTimed.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDebounceTimed.java @@ -29,9 +29,9 @@ public final class ObservableDebounceTimed extends AbstractObservableWithUpst final long timeout; final TimeUnit unit; final Scheduler scheduler; - final Consumer onDropped; + final Consumer onDropped; - public ObservableDebounceTimed(ObservableSource source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer onDropped) { + public ObservableDebounceTimed(ObservableSource source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer onDropped) { super(source); this.timeout = timeout; this.unit = unit; @@ -51,7 +51,7 @@ static final class DebounceTimedObserver final long timeout; final TimeUnit unit; final Scheduler.Worker worker; - final Consumer onDropped; + final Consumer onDropped; Disposable upstream; @@ -61,7 +61,7 @@ static final class DebounceTimedObserver boolean done; - DebounceTimedObserver(Observer actual, long timeout, TimeUnit unit, Worker worker, Consumer onDropped) { + DebounceTimedObserver(Observer actual, long timeout, TimeUnit unit, Worker worker, Consumer onDropped) { this.downstream = actual; this.timeout = timeout; this.unit = unit; @@ -85,12 +85,12 @@ public void onNext(T t) { long idx = index + 1; index = idx; - Disposable d = timer; - if (d != null) { - d.dispose(); + DebounceEmitter currentEmitter = timer; + if (currentEmitter != null) { + currentEmitter.dispose(); } - if (onDropped != null && timer != null) { + if (onDropped != null && currentEmitter != null) { try { onDropped.accept(timer.value); } catch (Throwable ex) { @@ -101,10 +101,9 @@ public void onNext(T t) { } } - DebounceEmitter de = new DebounceEmitter<>(t, idx, this); - timer = de; - d = worker.schedule(de, timeout, unit); - de.setResource(d); + DebounceEmitter newEmitter = new DebounceEmitter<>(t, idx, this); + timer = newEmitter; + newEmitter.setResource(worker.schedule(newEmitter, timeout, unit)); } @Override