Skip to content

Commit

Permalink
2.x: Fix window() with start/end selector not disposing/cancelling pr…
Browse files Browse the repository at this point in the history
…operly (#6398)

* 2.x: Fix window() with s/e selector not disposing/cancelling properly

* Fix cancellation upon backpressure problem/handler crash
  • Loading branch information
akarnokd authored Feb 13, 2019
1 parent 7fffa00 commit 184a17b
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ static final class WindowBoundaryMainSubscriber<T, B, V>

final AtomicLong windows = new AtomicLong();

final AtomicBoolean stopWindows = new AtomicBoolean();

WindowBoundaryMainSubscriber(Subscriber<? super Flowable<T>> actual,
Publisher<B> open, Function<? super B, ? extends Publisher<V>> close, int bufferSize) {
super(actual, new MpscLinkedQueue<Object>());
Expand All @@ -89,14 +91,13 @@ public void onSubscribe(Subscription s) {

downstream.onSubscribe(this);

if (cancelled) {
if (stopWindows.get()) {
return;
}

OperatorWindowBoundaryOpenSubscriber<T, B> os = new OperatorWindowBoundaryOpenSubscriber<T, B>(this);

if (boundary.compareAndSet(null, os)) {
windows.getAndIncrement();
s.request(Long.MAX_VALUE);
open.subscribe(os);
}
Expand Down Expand Up @@ -177,7 +178,12 @@ public void request(long n) {

@Override
public void cancel() {
cancelled = true;
if (stopWindows.compareAndSet(false, true)) {
DisposableHelper.dispose(boundary);
if (windows.decrementAndGet() == 0) {
upstream.cancel();
}
}
}

void dispose() {
Expand Down Expand Up @@ -236,7 +242,7 @@ void drainLoop() {
continue;
}

if (cancelled) {
if (stopWindows.get()) {
continue;
}

Expand All @@ -250,7 +256,7 @@ void drainLoop() {
produced(1);
}
} else {
cancelled = true;
cancel();
a.onError(new MissingBackpressureException("Could not deliver new window due to lack of requests"));
continue;
}
Expand All @@ -260,7 +266,7 @@ void drainLoop() {
try {
p = ObjectHelper.requireNonNull(close.apply(wo.open), "The publisher supplied is null");
} catch (Throwable e) {
cancelled = true;
cancel();
a.onError(e);
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ static final class WindowBoundaryMainObserver<T, B, V>

final AtomicLong windows = new AtomicLong();

final AtomicBoolean stopWindows = new AtomicBoolean();

WindowBoundaryMainObserver(Observer<? super Observable<T>> actual,
ObservableSource<B> open, Function<? super B, ? extends ObservableSource<V>> close, int bufferSize) {
super(actual, new MpscLinkedQueue<Object>());
Expand All @@ -87,14 +89,13 @@ public void onSubscribe(Disposable d) {

downstream.onSubscribe(this);

if (cancelled) {
if (stopWindows.get()) {
return;
}

OperatorWindowBoundaryOpenObserver<T, B> os = new OperatorWindowBoundaryOpenObserver<T, B>(this);

if (boundary.compareAndSet(null, os)) {
windows.getAndIncrement();
open.subscribe(os);
}
}
Expand Down Expand Up @@ -164,12 +165,17 @@ void error(Throwable t) {

@Override
public void dispose() {
cancelled = true;
if (stopWindows.compareAndSet(false, true)) {
DisposableHelper.dispose(boundary);
if (windows.decrementAndGet() == 0) {
upstream.dispose();
}
}
}

@Override
public boolean isDisposed() {
return cancelled;
return stopWindows.get();
}

void disposeBoundary() {
Expand Down Expand Up @@ -229,7 +235,7 @@ void drainLoop() {
continue;
}

if (cancelled) {
if (stopWindows.get()) {
continue;
}

Expand All @@ -244,7 +250,7 @@ void drainLoop() {
p = ObjectHelper.requireNonNull(close.apply(wo.open), "The ObservableSource supplied is null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
cancelled = true;
stopWindows.set(true);
a.onError(e);
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.*;
import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.subscriptions.BooleanSubscription;
Expand Down Expand Up @@ -254,8 +255,8 @@ public Flowable<Integer> apply(Integer t) {

ts.dispose();

// FIXME subject has subscribers because of the open window
assertTrue(open.hasSubscribers());
// Disposing the outer sequence stops the opening of new windows
assertFalse(open.hasSubscribers());
// FIXME subject has subscribers because of the open window
assertTrue(close.hasSubscribers());
}
Expand Down Expand Up @@ -430,4 +431,58 @@ protected void subscribeActual(
RxJavaPlugins.reset();
}
}

static Flowable<Integer> flowableDisposed(final AtomicBoolean ref) {
return Flowable.just(1).concatWith(Flowable.<Integer>never())
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
ref.set(true);
}
});
}

@Test
public void mainAndBoundaryDisposeOnNoWindows() {
AtomicBoolean mainDisposed = new AtomicBoolean();
AtomicBoolean openDisposed = new AtomicBoolean();
final AtomicBoolean closeDisposed = new AtomicBoolean();

flowableDisposed(mainDisposed)
.window(flowableDisposed(openDisposed), new Function<Integer, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Integer v) throws Exception {
return flowableDisposed(closeDisposed);
}
})
.test()
.assertSubscribed()
.assertNoErrors()
.assertNotComplete()
.dispose();

assertTrue(mainDisposed.get());
assertTrue(openDisposed.get());
assertTrue(closeDisposed.get());
}

@Test
@SuppressWarnings("unchecked")
public void mainWindowMissingBackpressure() {
PublishProcessor<Integer> source = PublishProcessor.create();
PublishProcessor<Integer> boundary = PublishProcessor.create();

TestSubscriber<Flowable<Integer>> ts = source.window(boundary, Functions.justFunction(Flowable.never()))
.test(0L)
;

ts.assertEmpty();

boundary.onNext(1);

ts.assertFailure(MissingBackpressureException.class);

assertFalse(source.hasSubscribers());
assertFalse(boundary.hasSubscribers());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.*;

Expand Down Expand Up @@ -256,8 +257,8 @@ public Observable<Integer> apply(Integer t) {

to.dispose();

// FIXME subject has subscribers because of the open window
assertTrue(open.hasObservers());
// Disposing the outer sequence stops the opening of new windows
assertFalse(open.hasObservers());
// FIXME subject has subscribers because of the open window
assertTrue(close.hasObservers());
}
Expand Down Expand Up @@ -423,4 +424,38 @@ protected void subscribeActual(
RxJavaPlugins.reset();
}
}

static Observable<Integer> observableDisposed(final AtomicBoolean ref) {
return Observable.just(1).concatWith(Observable.<Integer>never())
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
ref.set(true);
}
});
}

@Test
public void mainAndBoundaryDisposeOnNoWindows() {
AtomicBoolean mainDisposed = new AtomicBoolean();
AtomicBoolean openDisposed = new AtomicBoolean();
final AtomicBoolean closeDisposed = new AtomicBoolean();

observableDisposed(mainDisposed)
.window(observableDisposed(openDisposed), new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return observableDisposed(closeDisposed);
}
})
.test()
.assertSubscribed()
.assertNoErrors()
.assertNotComplete()
.dispose();

assertTrue(mainDisposed.get());
assertTrue(openDisposed.get());
assertTrue(closeDisposed.get());
}
}

0 comments on commit 184a17b

Please sign in to comment.