diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTimed.java index 223076dfc7..44499ec255 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTimed.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTimed.java @@ -105,9 +105,10 @@ public void onNext(T t) { downstream.onNext(t); BackpressureHelper.produced(this, 1); } else { + upstream.cancel(); done = true; - cancel(); downstream.onError(MissingBackpressureException.createDefault()); + worker.dispose(); return; } @@ -122,10 +123,10 @@ public void onNext(T t) { onDropped.accept(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - downstream.onError(ex); - worker.dispose(); upstream.cancel(); done = true; + downstream.onError(ex); + worker.dispose(); } } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleFirstTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleFirstTimed.java index aa0f5058fd..6bf3b9f119 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleFirstTimed.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleFirstTimed.java @@ -32,7 +32,7 @@ public final class ObservableThrottleFirstTimed extends AbstractObservableWit public ObservableThrottleFirstTimed( ObservableSource source, - long timeout, + long timeout, TimeUnit unit, Scheduler scheduler, Consumer onDropped) { @@ -102,9 +102,9 @@ public void onNext(T t) { onDropped.accept(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); + upstream.dispose(); downstream.onError(ex); worker.dispose(); - upstream.dispose(); } } }