Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Fix window(time) possible interrupts while terminating #6684

Merged
merged 1 commit into from
Oct 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ public void onError(Throwable t) {
}

downstream.onError(t);
dispose();
}

@Override
Expand All @@ -171,7 +170,6 @@ public void onComplete() {
}

downstream.onComplete();
dispose();
}

@Override
Expand All @@ -184,22 +182,15 @@ public void cancel() {
cancelled = true;
}

public void dispose() {
DisposableHelper.dispose(timer);
}

@Override
public void run() {

if (cancelled) {
terminated = true;
dispose();
}
queue.offer(NEXT);
if (enter()) {
drainLoop();
}

}

void drainLoop() {
Expand All @@ -221,13 +212,13 @@ void drainLoop() {
if (d && (o == null || o == NEXT)) {
window = null;
q.clear();
dispose();
Throwable err = error;
if (err != null) {
w.onError(err);
} else {
w.onComplete();
}
timer.dispose();
return;
}

Expand All @@ -251,8 +242,8 @@ void drainLoop() {
window = null;
queue.clear();
upstream.cancel();
dispose();
a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests."));
timer.dispose();
return;
}
} else {
Expand Down Expand Up @@ -396,7 +387,7 @@ public void onNext(T t) {
window = null;
upstream.cancel();
downstream.onError(new MissingBackpressureException("Could not deliver window due to lack of requests"));
dispose();
disposeTimer();
return;
}
} else {
Expand Down Expand Up @@ -424,7 +415,6 @@ public void onError(Throwable t) {
}

downstream.onError(t);
dispose();
}

@Override
Expand All @@ -435,7 +425,6 @@ public void onComplete() {
}

downstream.onComplete();
dispose();
}

@Override
Expand All @@ -448,8 +437,8 @@ public void cancel() {
cancelled = true;
}

public void dispose() {
DisposableHelper.dispose(timer);
public void disposeTimer() {
timer.dispose();
Worker w = worker;
if (w != null) {
w.dispose();
Expand All @@ -468,7 +457,7 @@ void drainLoop() {
if (terminated) {
upstream.cancel();
q.clear();
dispose();
disposeTimer();
return;
}

Expand All @@ -488,7 +477,7 @@ void drainLoop() {
} else {
w.onComplete();
}
dispose();
disposeTimer();
return;
}

Expand All @@ -515,7 +504,7 @@ void drainLoop() {
queue.clear();
upstream.cancel();
a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests."));
dispose();
disposeTimer();
return;
}
}
Expand Down Expand Up @@ -554,7 +543,7 @@ void drainLoop() {
window = null;
upstream.cancel();
downstream.onError(new MissingBackpressureException("Could not deliver window due to lack of requests"));
dispose();
disposeTimer();
return;
}
} else {
Expand Down Expand Up @@ -585,7 +574,6 @@ public void run() {
p.queue.offer(this);
} else {
p.terminated = true;
p.dispose();
}
if (p.enter()) {
p.drainLoop();
Expand Down Expand Up @@ -682,7 +670,6 @@ public void onError(Throwable t) {
}

downstream.onError(t);
dispose();
}

@Override
Expand All @@ -693,7 +680,6 @@ public void onComplete() {
}

downstream.onComplete();
dispose();
}

@Override
Expand All @@ -706,10 +692,6 @@ public void cancel() {
cancelled = true;
}

public void dispose() {
worker.dispose();
}

void complete(UnicastProcessor<T> w) {
queue.offer(new SubjectWork<T>(w, false));
if (enter()) {
Expand All @@ -730,9 +712,9 @@ void drainLoop() {
for (;;) {
if (terminated) {
upstream.cancel();
dispose();
q.clear();
ws.clear();
worker.dispose();
return;
}

Expand All @@ -756,7 +738,7 @@ void drainLoop() {
}
}
ws.clear();
dispose();
worker.dispose();
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.*;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.Scheduler.Worker;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.observers.QueueDrainObserver;
import io.reactivex.internal.queue.MpscLinkedQueue;
import io.reactivex.internal.util.NotificationLite;
Expand Down Expand Up @@ -85,7 +84,7 @@ static final class WindowExactUnboundedObserver<T>

UnicastSubject<T> window;

final AtomicReference<Disposable> timer = new AtomicReference<Disposable>();
final SequentialDisposable timer = new SequentialDisposable();

static final Object NEXT = new Object();

Expand Down Expand Up @@ -114,7 +113,7 @@ public void onSubscribe(Disposable d) {

if (!cancelled) {
Disposable task = scheduler.schedulePeriodicallyDirect(this, timespan, timespan, unit);
DisposableHelper.replace(timer, task);
timer.replace(task);
}
}
}
Expand Down Expand Up @@ -146,7 +145,6 @@ public void onError(Throwable t) {
drainLoop();
}

disposeTimer();
downstream.onError(t);
}

Expand All @@ -157,7 +155,6 @@ public void onComplete() {
drainLoop();
}

disposeTimer();
downstream.onComplete();
}

Expand All @@ -171,15 +168,10 @@ public boolean isDisposed() {
return cancelled;
}

void disposeTimer() {
DisposableHelper.dispose(timer);
}

@Override
public void run() {
if (cancelled) {
terminated = true;
disposeTimer();
}
queue.offer(NEXT);
if (enter()) {
Expand All @@ -206,13 +198,13 @@ void drainLoop() {
if (d && (o == null || o == NEXT)) {
window = null;
q.clear();
disposeTimer();
Throwable err = error;
if (err != null) {
w.onError(err);
} else {
w.onComplete();
}
timer.dispose();
return;
}

Expand Down Expand Up @@ -266,7 +258,7 @@ static final class WindowExactBoundedObserver<T>

volatile boolean terminated;

final AtomicReference<Disposable> timer = new AtomicReference<Disposable>();
final SequentialDisposable timer = new SequentialDisposable();

WindowExactBoundedObserver(
Observer<? super Observable<T>> actual,
Expand Down Expand Up @@ -312,7 +304,7 @@ public void onSubscribe(Disposable d) {
task = scheduler.schedulePeriodicallyDirect(consumerIndexHolder, timespan, timespan, unit);
}

DisposableHelper.replace(timer, task);
timer.replace(task);
}
}

Expand Down Expand Up @@ -370,7 +362,6 @@ public void onError(Throwable t) {
}

downstream.onError(t);
disposeTimer();
}

@Override
Expand All @@ -381,7 +372,6 @@ public void onComplete() {
}

downstream.onComplete();
disposeTimer();
}

@Override
Expand Down Expand Up @@ -428,13 +418,13 @@ void drainLoop() {
if (d && (empty || isHolder)) {
window = null;
q.clear();
disposeTimer();
Throwable err = error;
if (err != null) {
w.onError(err);
} else {
w.onComplete();
}
disposeTimer();
return;
}

Expand Down Expand Up @@ -507,7 +497,6 @@ public void run() {
p.queue.offer(this);
} else {
p.terminated = true;
p.disposeTimer();
}
if (p.enter()) {
p.drainLoop();
Expand Down Expand Up @@ -592,7 +581,6 @@ public void onError(Throwable t) {
}

downstream.onError(t);
disposeWorker();
}

@Override
Expand All @@ -603,7 +591,6 @@ public void onComplete() {
}

downstream.onComplete();
disposeWorker();
}

@Override
Expand All @@ -616,10 +603,6 @@ public boolean isDisposed() {
return cancelled;
}

void disposeWorker() {
worker.dispose();
}

void complete(UnicastSubject<T> w) {
queue.offer(new SubjectWork<T>(w, false));
if (enter()) {
Expand All @@ -640,9 +623,9 @@ void drainLoop() {
for (;;) {
if (terminated) {
upstream.dispose();
disposeWorker();
q.clear();
ws.clear();
worker.dispose();
return;
}

Expand All @@ -665,8 +648,8 @@ void drainLoop() {
w.onComplete();
}
}
disposeWorker();
ws.clear();
worker.dispose();
return;
}

Expand Down
Loading