From 08fffcef3117dccdd044a092cce4e5e3f1f318ba Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 16 Jan 2018 15:43:55 +0100 Subject: [PATCH 1/3] 2.x: Fix buffer(open, close) not disposing indicators properly --- .../flowable/FlowableBufferBoundary.java | 465 ++++++++++++------ .../observable/ObservableBufferBoundary.java | 418 ++++++++++------ .../flowable/FlowableBufferTest.java | 364 +++++++++++++- .../observable/ObservableBufferTest.java | 300 ++++++++++- 4 files changed, 1238 insertions(+), 309 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java index e375d5f720..1396594fcc 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java @@ -15,22 +15,19 @@ import java.util.*; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import org.reactivestreams.*; -import io.reactivex.Flowable; +import io.reactivex.*; import io.reactivex.disposables.*; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.fuseable.SimplePlainQueue; -import io.reactivex.internal.queue.MpscLinkedQueue; -import io.reactivex.internal.subscribers.QueueDrainSubscriber; +import io.reactivex.internal.queue.SpscLinkedArrayQueue; import io.reactivex.internal.subscriptions.SubscriptionHelper; -import io.reactivex.internal.util.QueueDrainHelper; +import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.subscribers.*; public final class FlowableBufferBoundary, Open, Close> extends AbstractFlowableWithUpstream { @@ -48,48 +45,72 @@ public FlowableBufferBoundary(Flowable source, Publisher buff @Override protected void subscribeActual(Subscriber s) { - source.subscribe(new BufferBoundarySubscriber( - new SerializedSubscriber(s), - bufferOpen, bufferClose, bufferSupplier - )); + BufferBoundarySubscriber parent = + new BufferBoundarySubscriber( + s, bufferOpen, bufferClose, bufferSupplier + ); + s.onSubscribe(parent); + source.subscribe(parent); } - static final class BufferBoundarySubscriber, Open, Close> - extends QueueDrainSubscriber implements Subscription, Disposable { + static final class BufferBoundarySubscriber, Open, Close> + extends AtomicInteger implements FlowableSubscriber, Subscription { + + private static final long serialVersionUID = -8466418554264089604L; + + final Subscriber actual; + + final Callable bufferSupplier; + final Publisher bufferOpen; + final Function> bufferClose; - final Callable bufferSupplier; - final CompositeDisposable resources; - Subscription s; + final CompositeDisposable subscribers; + + final AtomicLong requested; + + final AtomicReference upstream; + + final AtomicThrowable errors; - final List buffers; + volatile boolean done; - final AtomicInteger windows = new AtomicInteger(); + final SpscLinkedArrayQueue queue; - BufferBoundarySubscriber(Subscriber actual, + volatile boolean cancelled; + + long index; + + Map buffers; + + long emitted; + + BufferBoundarySubscriber(Subscriber actual, Publisher bufferOpen, Function> bufferClose, - Callable bufferSupplier) { - super(actual, new MpscLinkedQueue()); + Callable bufferSupplier + ) { + this.actual = actual; + this.bufferSupplier = bufferSupplier; this.bufferOpen = bufferOpen; this.bufferClose = bufferClose; - this.bufferSupplier = bufferSupplier; - this.buffers = new LinkedList(); - this.resources = new CompositeDisposable(); + this.queue = new SpscLinkedArrayQueue(bufferSize()); + this.subscribers = new CompositeDisposable(); + this.requested = new AtomicLong(); + this.upstream = new AtomicReference(); + this.buffers = new HashMap(); + this.errors = new AtomicThrowable(); } + @Override public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.s, s)) { - this.s = s; + if (SubscriptionHelper.setOnce(this.upstream, s)) { - BufferOpenSubscriber bos = new BufferOpenSubscriber(this); - resources.add(bos); + BufferOpenSubscriber open = new BufferOpenSubscriber(this); + subscribers.add(open); - actual.onSubscribe(this); - - windows.lazySet(1); - bufferOpen.subscribe(bos); + bufferOpen.subscribe(open); s.request(Long.MAX_VALUE); } @@ -98,7 +119,11 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { synchronized (this) { - for (U b : buffers) { + Map bufs = buffers; + if (bufs == null) { + return; + } + for (C b : bufs.values()) { b.add(t); } } @@ -106,206 +131,328 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - cancel(); - cancelled = true; - synchronized (this) { - buffers.clear(); + if (errors.addThrowable(t)) { + subscribers.dispose(); + synchronized (this) { + buffers = null; + } + done = true; + drain(); + } else { + RxJavaPlugins.onError(t); } - actual.onError(t); } @Override public void onComplete() { - if (windows.decrementAndGet() == 0) { - complete(); - } - } - - void complete() { - List list; + subscribers.dispose(); synchronized (this) { - list = new ArrayList(buffers); - buffers.clear(); - } - - SimplePlainQueue q = queue; - for (U u : list) { - q.offer(u); + Map bufs = buffers; + if (bufs == null) { + return; + } + for (C b : bufs.values()) { + queue.offer(b); + } + bufs = null; } done = true; - if (enter()) { - QueueDrainHelper.drainMaxLoop(q, actual, false, this, this); - } + drain(); } @Override public void request(long n) { - requested(n); - } - - @Override - public void dispose() { - resources.dispose(); - } - - @Override - public boolean isDisposed() { - return resources.isDisposed(); + BackpressureHelper.add(requested, n); + drain(); } @Override public void cancel() { - if (!cancelled) { + if (SubscriptionHelper.cancel(upstream)) { cancelled = true; - dispose(); + subscribers.dispose(); + synchronized (this) { + buffers = null; + } + if (getAndIncrement() != 0) { + queue.clear(); + } } } - @Override - public boolean accept(Subscriber a, U v) { - a.onNext(v); - return true; - } - - void open(Open window) { - if (cancelled) { + void open(Open token) { + Publisher p; + C buf; + try { + buf = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null Collection"); + p = ObjectHelper.requireNonNull(bufferClose.apply(token), "The bufferClose returned a null Publisher"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + SubscriptionHelper.cancel(upstream); + if (errors.addThrowable(ex)) { + subscribers.dispose(); + synchronized (this) { + buffers = null; + } + done = true; + drain(); + } else { + RxJavaPlugins.onError(ex); + } return; } - U b; - - try { - b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - onError(e); - return; + long idx = index; + index = idx + 1; + synchronized (this) { + Map bufs = buffers; + if (bufs == null) { + return; + } + bufs.put(idx, buf); } - Publisher p; + BufferCloseSubscriber bc = new BufferCloseSubscriber(this, idx); + subscribers.add(bc); + p.subscribe(bc); + } - try { - p = ObjectHelper.requireNonNull(bufferClose.apply(window), "The buffer closing publisher is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - onError(e); - return; + void openError(BufferOpenSubscriber os, Throwable ex) { + SubscriptionHelper.cancel(upstream); + subscribers.delete(os); + if (errors.addThrowable(ex)) { + subscribers.dispose(); + synchronized (this) { + buffers = null; + } + done = true; + drain(); + } else { + RxJavaPlugins.onError(ex); } + } - if (cancelled) { - return; + void openComplete(BufferOpenSubscriber os) { + subscribers.delete(os); + if (subscribers.size() == 0) { + SubscriptionHelper.cancel(upstream); + done = true; + drain(); } + } + void close(BufferCloseSubscriber closer, long idx) { + subscribers.delete(closer); + boolean makeDone = false; + if (subscribers.size() == 0) { + makeDone = true; + SubscriptionHelper.cancel(upstream); + } synchronized (this) { - if (cancelled) { + Map bufs = buffers; + if (bufs == null) { return; } - buffers.add(b); + queue.offer(buffers.remove(idx)); + } + if (makeDone) { + done = true; } + drain(); + } - BufferCloseSubscriber bcs = new BufferCloseSubscriber(b, this); - resources.add(bcs); + void closeError(BufferCloseSubscriber closer, Throwable ex) { + SubscriptionHelper.cancel(upstream); + subscribers.delete(closer); + if (errors.addThrowable(ex)) { + subscribers.dispose(); + synchronized (this) { + buffers = null; + } + done = true; + drain(); + } else { + RxJavaPlugins.onError(ex); + } + } - windows.getAndIncrement(); + void drain() { + if (getAndIncrement() != 0) { + return; + } - p.subscribe(bcs); - } + int missed = 1; + long e = emitted; + Subscriber a = actual; + SpscLinkedArrayQueue q = queue; + + for (;;) { + long r = requested.get(); + + while (e != r) { + if (cancelled) { + q.clear(); + return; + } + + boolean d = done; + if (d && errors.get() != null) { + q.clear(); + Throwable ex = errors.terminate(); + a.onError(ex); + return; + } + + C v = q.poll(); + boolean empty = v == null; + + if (d && empty) { + a.onComplete(); + return; + } + + if (empty) { + break; + } + + a.onNext(v); + e++; + } - void openFinished(Disposable d) { - if (resources.remove(d)) { - if (windows.decrementAndGet() == 0) { - complete(); + if (e == r) { + if (cancelled) { + q.clear(); + return; + } + + if (done) { + if (errors.get() != null) { + q.clear(); + Throwable ex = errors.terminate(); + a.onError(ex); + return; + } else + if (q.isEmpty()) { + a.onComplete(); + return; + } + } + } + + emitted = e; + missed = addAndGet(-missed); + if (missed == 0) { + break; } } } - void close(U b, Disposable d) { + static final class BufferOpenSubscriber + extends AtomicReference + implements FlowableSubscriber, Disposable { - boolean e; - synchronized (this) { - e = buffers.remove(b); - } + private static final long serialVersionUID = -8498650778633225126L; + + final BufferBoundarySubscriber parent; - if (e) { - fastPathOrderedEmitMax(b, false, this); + BufferOpenSubscriber(BufferBoundarySubscriber parent) { + this.parent = parent; } - if (resources.remove(d)) { - if (windows.decrementAndGet() == 0) { - complete(); + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.setOnce(this, s)) { + s.request(Long.MAX_VALUE); } } - } - } - static final class BufferOpenSubscriber, Open, Close> - extends DisposableSubscriber { - final BufferBoundarySubscriber parent; + @Override + public void onNext(Open t) { + parent.open(t); + } - boolean done; + @Override + public void onError(Throwable t) { + lazySet(SubscriptionHelper.CANCELLED); + parent.openError(this, t); + } - BufferOpenSubscriber(BufferBoundarySubscriber parent) { - this.parent = parent; - } - @Override - public void onNext(Open t) { - if (done) { - return; + @Override + public void onComplete() { + lazySet(SubscriptionHelper.CANCELLED); + parent.openComplete(this); } - parent.open(t); - } - @Override - public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; + @Override + public void dispose() { + SubscriptionHelper.cancel(this); } - done = true; - parent.onError(t); - } - @Override - public void onComplete() { - if (done) { - return; + @Override + public boolean isDisposed() { + return get() == SubscriptionHelper.CANCELLED; } - done = true; - parent.openFinished(this); } } - static final class BufferCloseSubscriber, Open, Close> - extends DisposableSubscriber { - final BufferBoundarySubscriber parent; - final U value; - boolean done; - BufferCloseSubscriber(U value, BufferBoundarySubscriber parent) { + static final class BufferCloseSubscriber> + extends AtomicReference + implements FlowableSubscriber, Disposable { + + private static final long serialVersionUID = -8498650778633225126L; + + final BufferBoundarySubscriber parent; + + final long index; + + BufferCloseSubscriber(BufferBoundarySubscriber parent, long index) { this.parent = parent; - this.value = value; + this.index = index; } @Override - public void onNext(Close t) { - onComplete(); + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.setOnce(this, s)) { + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(Object t) { + Subscription s = get(); + if (s != SubscriptionHelper.CANCELLED) { + lazySet(SubscriptionHelper.CANCELLED); + s.cancel(); + parent.close(this, index); + } } @Override public void onError(Throwable t) { - if (done) { + if (get() != SubscriptionHelper.CANCELLED) { + lazySet(SubscriptionHelper.CANCELLED); + parent.closeError(this, t); + } else { RxJavaPlugins.onError(t); - return; } - parent.onError(t); } @Override public void onComplete() { - if (done) { - return; + if (get() != SubscriptionHelper.CANCELLED) { + lazySet(SubscriptionHelper.CANCELLED); + parent.close(this, index); } - done = true; - parent.close(value, this); + } + + @Override + public void dispose() { + SubscriptionHelper.cancel(this); + } + + @Override + public boolean isDisposed() { + return get() == SubscriptionHelper.CANCELLED; } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java index a9051e474a..a9dc182526 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java @@ -15,7 +15,7 @@ import java.util.*; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import io.reactivex.ObservableSource; import io.reactivex.Observer; @@ -24,11 +24,8 @@ import io.reactivex.functions.Function; import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.fuseable.SimplePlainQueue; -import io.reactivex.internal.observers.QueueDrainObserver; -import io.reactivex.internal.queue.MpscLinkedQueue; -import io.reactivex.internal.util.QueueDrainHelper; -import io.reactivex.observers.*; +import io.reactivex.internal.queue.SpscLinkedArrayQueue; +import io.reactivex.internal.util.AtomicThrowable; import io.reactivex.plugins.RxJavaPlugins; public final class ObservableBufferBoundary, Open, Close> @@ -47,55 +44,78 @@ public ObservableBufferBoundary(ObservableSource source, ObservableSource t) { - source.subscribe(new BufferBoundaryObserver( - new SerializedObserver(t), - bufferOpen, bufferClose, bufferSupplier - )); + BufferBoundaryObserver parent = + new BufferBoundaryObserver( + t, bufferOpen, bufferClose, bufferSupplier + ); + t.onSubscribe(parent); + source.subscribe(parent); } - static final class BufferBoundaryObserver, Open, Close> - extends QueueDrainObserver implements Disposable { + static final class BufferBoundaryObserver, Open, Close> + extends AtomicInteger implements Observer, Disposable { + + private static final long serialVersionUID = -8466418554264089604L; + + final Observer actual; + + final Callable bufferSupplier; + final ObservableSource bufferOpen; + final Function> bufferClose; - final Callable bufferSupplier; - final CompositeDisposable resources; - Disposable s; + final CompositeDisposable observers; + + final AtomicReference upstream; + + final AtomicThrowable errors; - final List buffers; + volatile boolean done; - final AtomicInteger windows = new AtomicInteger(); + final SpscLinkedArrayQueue queue; - BufferBoundaryObserver(Observer actual, + volatile boolean cancelled; + + long index; + + Map buffers; + + BufferBoundaryObserver(Observer actual, ObservableSource bufferOpen, Function> bufferClose, - Callable bufferSupplier) { - super(actual, new MpscLinkedQueue()); + Callable bufferSupplier + ) { + this.actual = actual; + this.bufferSupplier = bufferSupplier; this.bufferOpen = bufferOpen; this.bufferClose = bufferClose; - this.bufferSupplier = bufferSupplier; - this.buffers = new LinkedList(); - this.resources = new CompositeDisposable(); + this.queue = new SpscLinkedArrayQueue(bufferSize()); + this.observers = new CompositeDisposable(); + this.upstream = new AtomicReference(); + this.buffers = new HashMap(); + this.errors = new AtomicThrowable(); } + @Override public void onSubscribe(Disposable s) { - if (DisposableHelper.validate(this.s, s)) { - this.s = s; + if (DisposableHelper.setOnce(this.upstream, s)) { - BufferOpenObserver bos = new BufferOpenObserver(this); - resources.add(bos); + BufferOpenObserver open = new BufferOpenObserver(this); + observers.add(open); - actual.onSubscribe(this); - - windows.lazySet(1); - bufferOpen.subscribe(bos); + bufferOpen.subscribe(open); } } @Override public void onNext(T t) { synchronized (this) { - for (U b : buffers) { + Map bufs = buffers; + if (bufs == null) { + return; + } + for (C b : bufs.values()) { b.add(t); } } @@ -103,194 +123,298 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - dispose(); - cancelled = true; - synchronized (this) { - buffers.clear(); + if (errors.addThrowable(t)) { + observers.dispose(); + synchronized (this) { + buffers = null; + } + done = true; + drain(); + } else { + RxJavaPlugins.onError(t); } - actual.onError(t); } @Override public void onComplete() { - if (windows.decrementAndGet() == 0) { - complete(); - } - } - - void complete() { - List list; + observers.dispose(); synchronized (this) { - list = new ArrayList(buffers); - buffers.clear(); - } - - SimplePlainQueue q = queue; - for (U u : list) { - q.offer(u); + Map bufs = buffers; + if (bufs == null) { + return; + } + for (C b : bufs.values()) { + queue.offer(b); + } + bufs = null; } done = true; - if (enter()) { - QueueDrainHelper.drainLoop(q, actual, false, this, this); - } + drain(); } @Override public void dispose() { - if (!cancelled) { + if (DisposableHelper.dispose(upstream)) { cancelled = true; - resources.dispose(); + observers.dispose(); + synchronized (this) { + buffers = null; + } + if (getAndIncrement() != 0) { + queue.clear(); + } } } - @Override public boolean isDisposed() { - return cancelled; - } - @Override - public void accept(Observer a, U v) { - a.onNext(v); + public boolean isDisposed() { + return DisposableHelper.isDisposed(upstream.get()); } - void open(Open window) { - if (cancelled) { + void open(Open token) { + ObservableSource p; + C buf; + try { + buf = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null Collection"); + p = ObjectHelper.requireNonNull(bufferClose.apply(token), "The bufferClose returned a null ObservableSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + DisposableHelper.dispose(upstream); + if (errors.addThrowable(ex)) { + observers.dispose(); + synchronized (this) { + buffers = null; + } + done = true; + drain(); + } else { + RxJavaPlugins.onError(ex); + } return; } - U b; - - try { - b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - onError(e); - return; + long idx = index; + index = idx + 1; + synchronized (this) { + Map bufs = buffers; + if (bufs == null) { + return; + } + bufs.put(idx, buf); } - ObservableSource p; + BufferCloseObserver bc = new BufferCloseObserver(this, idx); + observers.add(bc); + p.subscribe(bc); + } - try { - p = ObjectHelper.requireNonNull(bufferClose.apply(window), "The buffer closing Observable is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - onError(e); - return; + void openError(BufferOpenObserver os, Throwable ex) { + DisposableHelper.dispose(upstream); + observers.delete(os); + if (errors.addThrowable(ex)) { + observers.dispose(); + synchronized (this) { + buffers = null; + } + done = true; + drain(); + } else { + RxJavaPlugins.onError(ex); } + } - if (cancelled) { - return; + void openComplete(BufferOpenObserver os) { + observers.delete(os); + if (observers.size() == 0) { + DisposableHelper.dispose(upstream); + done = true; + drain(); } + } + void close(BufferCloseObserver closer, long idx) { + observers.delete(closer); + boolean makeDone = false; + if (observers.size() == 0) { + makeDone = true; + DisposableHelper.dispose(upstream); + } synchronized (this) { - if (cancelled) { + Map bufs = buffers; + if (bufs == null) { return; } - buffers.add(b); + queue.offer(buffers.remove(idx)); } + if (makeDone) { + done = true; + } + drain(); + } - BufferCloseObserver bcs = new BufferCloseObserver(b, this); - resources.add(bcs); + void closeError(BufferCloseObserver closer, Throwable ex) { + DisposableHelper.dispose(upstream); + observers.delete(closer); + if (errors.addThrowable(ex)) { + observers.dispose(); + synchronized (this) { + buffers = null; + } + done = true; + drain(); + } else { + RxJavaPlugins.onError(ex); + } + } - windows.getAndIncrement(); + void drain() { + if (getAndIncrement() != 0) { + return; + } - p.subscribe(bcs); - } + int missed = 1; + Observer a = actual; + SpscLinkedArrayQueue q = queue; + + for (;;) { + for (;;) { + if (cancelled) { + q.clear(); + return; + } + + boolean d = done; + if (d && errors.get() != null) { + q.clear(); + Throwable ex = errors.terminate(); + a.onError(ex); + return; + } + + C v = q.poll(); + boolean empty = v == null; + + if (d && empty) { + a.onComplete(); + return; + } + + if (empty) { + break; + } + + a.onNext(v); + } - void openFinished(Disposable d) { - if (resources.remove(d)) { - if (windows.decrementAndGet() == 0) { - complete(); + missed = addAndGet(-missed); + if (missed == 0) { + break; } } } - void close(U b, Disposable d) { + static final class BufferOpenObserver + extends AtomicReference + implements Observer, Disposable { - boolean e; - synchronized (this) { - e = buffers.remove(b); + private static final long serialVersionUID = -8498650778633225126L; + + final BufferBoundaryObserver parent; + + BufferOpenObserver(BufferBoundaryObserver parent) { + this.parent = parent; } - if (e) { - fastPathOrderedEmit(b, false, this); + @Override + public void onSubscribe(Disposable s) { + DisposableHelper.setOnce(this, s); } - if (resources.remove(d)) { - if (windows.decrementAndGet() == 0) { - complete(); - } + @Override + public void onNext(Open t) { + parent.open(t); + } + + @Override + public void onError(Throwable t) { + lazySet(DisposableHelper.DISPOSED); + parent.openError(this, t); + } + + @Override + public void onComplete() { + lazySet(DisposableHelper.DISPOSED); + parent.openComplete(this); + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return get() == DisposableHelper.DISPOSED; } } } - static final class BufferOpenObserver, Open, Close> - extends DisposableObserver { - final BufferBoundaryObserver parent; + static final class BufferCloseObserver> + extends AtomicReference + implements Observer, Disposable { + + private static final long serialVersionUID = -8498650778633225126L; - boolean done; + final BufferBoundaryObserver parent; - BufferOpenObserver(BufferBoundaryObserver parent) { + final long index; + + BufferCloseObserver(BufferBoundaryObserver parent, long index) { this.parent = parent; + this.index = index; } + @Override - public void onNext(Open t) { - if (done) { - return; + public void onSubscribe(Disposable s) { + DisposableHelper.setOnce(this, s); + } + + @Override + public void onNext(Object t) { + Disposable s = get(); + if (s != DisposableHelper.DISPOSED) { + lazySet(DisposableHelper.DISPOSED); + s.dispose(); + parent.close(this, index); } - parent.open(t); } @Override public void onError(Throwable t) { - if (done) { + if (get() != DisposableHelper.DISPOSED) { + lazySet(DisposableHelper.DISPOSED); + parent.closeError(this, t); + } else { RxJavaPlugins.onError(t); - return; } - done = true; - parent.onError(t); } @Override public void onComplete() { - if (done) { - return; + if (get() != DisposableHelper.DISPOSED) { + lazySet(DisposableHelper.DISPOSED); + parent.close(this, index); } - done = true; - parent.openFinished(this); - } - } - - static final class BufferCloseObserver, Open, Close> - extends DisposableObserver { - final BufferBoundaryObserver parent; - final U value; - boolean done; - BufferCloseObserver(U value, BufferBoundaryObserver parent) { - this.parent = parent; - this.value = value; } @Override - public void onNext(Close t) { - onComplete(); - } - - @Override - public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; - } - parent.onError(t); + public void dispose() { + DisposableHelper.dispose(this); } @Override - public void onComplete() { - if (done) { - return; - } - done = true; - parent.close(value, this); + public boolean isDisposed() { + return get() == DisposableHelper.DISPOSED; } } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java index bd18c0187d..bbab7b80a1 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java @@ -17,6 +17,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import java.io.IOException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -26,7 +27,8 @@ import org.reactivestreams.*; import io.reactivex.*; -import io.reactivex.exceptions.TestException; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; @@ -2074,4 +2076,364 @@ public void run() throws Exception { assertEquals(0, counter.get()); } + + @Test + @SuppressWarnings("unchecked") + public void boundaryOpenCloseDisposedOnComplete() { + PublishProcessor pp0 = PublishProcessor.create(); + + PublishProcessor pp1 = PublishProcessor.create(); + + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber> ts = pp0 + .buffer(pp1, Functions.justFunction(pp2)) + .test(); + + assertTrue(pp0.hasSubscribers()); + assertTrue(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + pp1.onNext(1); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + pp0.onComplete(); + + ts.assertResult(Collections.emptyList()); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + } + + @Test + public void bufferedCanCompleteIfOpenNeverCompletesDropping() { + Flowable.range(1, 50) + .zipWith(Flowable.interval(5, TimeUnit.MILLISECONDS), + new BiFunction() { + @Override + public Integer apply(Integer integer, Long aLong) { + return integer; + } + }) + .buffer(Flowable.interval(0,200, TimeUnit.MILLISECONDS), + new Function>() { + @Override + public Publisher apply(Long a) { + return Flowable.just(a).delay(100, TimeUnit.MILLISECONDS); + } + }) + .test() + .assertSubscribed() + .awaitDone(3, TimeUnit.SECONDS) + .assertComplete(); + } + + @Test + public void bufferedCanCompleteIfOpenNeverCompletesOverlapping() { + Flowable.range(1, 50) + .zipWith(Flowable.interval(5, TimeUnit.MILLISECONDS), + new BiFunction() { + @Override + public Integer apply(Integer integer, Long aLong) { + return integer; + } + }) + .buffer(Flowable.interval(0,100, TimeUnit.MILLISECONDS), + new Function>() { + @Override + public Publisher apply(Long a) { + return Flowable.just(a).delay(200, TimeUnit.MILLISECONDS); + } + }) + .test() + .assertSubscribed() + .awaitDone(3, TimeUnit.SECONDS) + .assertComplete(); + } + + @Test + @SuppressWarnings("unchecked") + public void openClosemainError() { + Flowable.error(new TestException()) + .buffer(Flowable.never(), Functions.justFunction(Flowable.never())) + .test() + .assertFailure(TestException.class); + } + + @Test + @SuppressWarnings("unchecked") + public void openClosebadSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + BooleanSubscription bs1 = new BooleanSubscription(); + BooleanSubscription bs2 = new BooleanSubscription(); + + s.onSubscribe(bs1); + + assertFalse(bs1.isCancelled()); + assertFalse(bs2.isCancelled()); + + s.onSubscribe(bs2); + + assertFalse(bs1.isCancelled()); + assertTrue(bs2.isCancelled()); + + s.onError(new IOException()); + s.onComplete(); + s.onNext(1); + s.onError(new TestException()); + } + } + .buffer(Flowable.never(), Functions.justFunction(Flowable.never())) + .test() + .assertFailure(IOException.class); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertUndeliverable(errors, 1, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseOpenCompletes() { + PublishProcessor pp0 = PublishProcessor.create(); + + PublishProcessor pp1 = PublishProcessor.create(); + + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber> ts = pp0 + .buffer(pp1, Functions.justFunction(pp2)) + .test(); + + pp1.onNext(1); + + assertTrue(pp2.hasSubscribers()); + + pp1.onComplete(); + + assertTrue(pp0.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + pp2.onComplete(); + + assertFalse(pp0.hasSubscribers()); + + ts.assertResult(Collections.emptyList()); + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseOpenCompletesNoBuffers() { + PublishProcessor pp0 = PublishProcessor.create(); + + PublishProcessor pp1 = PublishProcessor.create(); + + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber> ts = pp0 + .buffer(pp1, Functions.justFunction(pp2)) + .test(); + + pp1.onNext(1); + + assertTrue(pp2.hasSubscribers()); + + pp2.onComplete(); + + assertTrue(pp0.hasSubscribers()); + assertTrue(pp1.hasSubscribers()); + + pp1.onComplete(); + + assertFalse(pp0.hasSubscribers()); + + ts.assertResult(Collections.emptyList()); + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseTake() { + PublishProcessor pp0 = PublishProcessor.create(); + + PublishProcessor pp1 = PublishProcessor.create(); + + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber> ts = pp0 + .buffer(pp1, Functions.justFunction(pp2)) + .take(1) + .test(2); + + pp1.onNext(1); + pp2.onComplete(); + + assertFalse(pp0.hasSubscribers()); + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertResult(Collections.emptyList()); + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseLimit() { + PublishProcessor pp0 = PublishProcessor.create(); + + PublishProcessor pp1 = PublishProcessor.create(); + + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber> ts = pp0 + .buffer(pp1, Functions.justFunction(pp2)) + .limit(1) + .test(2); + + pp1.onNext(1); + pp2.onComplete(); + + assertFalse(pp0.hasSubscribers()); + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertResult(Collections.emptyList()); + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseEmptyBackpressure() { + PublishProcessor pp0 = PublishProcessor.create(); + + PublishProcessor pp1 = PublishProcessor.create(); + + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber> ts = pp0 + .buffer(pp1, Functions.justFunction(pp2)) + .test(0); + + pp0.onComplete(); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertResult(); + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseErrorBackpressure() { + PublishProcessor pp0 = PublishProcessor.create(); + + PublishProcessor pp1 = PublishProcessor.create(); + + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber> ts = pp0 + .buffer(pp1, Functions.justFunction(pp2)) + .test(0); + + pp0.onError(new TestException()); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertFailure(TestException.class); + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseBadOpen() { + List errors = TestHelper.trackPluginErrors(); + try { + Flowable.never() + .buffer(new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + + assertFalse(((Disposable)s).isDisposed()); + + BooleanSubscription bs1 = new BooleanSubscription(); + BooleanSubscription bs2 = new BooleanSubscription(); + + s.onSubscribe(bs1); + + assertFalse(bs1.isCancelled()); + assertFalse(bs2.isCancelled()); + + s.onSubscribe(bs2); + + assertFalse(bs1.isCancelled()); + assertTrue(bs2.isCancelled()); + + s.onError(new IOException()); + + assertTrue(((Disposable)s).isDisposed()); + + s.onComplete(); + s.onNext(1); + s.onError(new TestException()); + } + }, Functions.justFunction(Flowable.never())) + .test() + .assertFailure(IOException.class); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertUndeliverable(errors, 1, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseBadClose() { + List errors = TestHelper.trackPluginErrors(); + try { + Flowable.never() + .buffer(Flowable.just(1).concatWith(Flowable.never()), + Functions.justFunction(new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + + assertFalse(((Disposable)s).isDisposed()); + + BooleanSubscription bs1 = new BooleanSubscription(); + BooleanSubscription bs2 = new BooleanSubscription(); + + s.onSubscribe(bs1); + + assertFalse(bs1.isCancelled()); + assertFalse(bs2.isCancelled()); + + s.onSubscribe(bs2); + + assertFalse(bs1.isCancelled()); + assertTrue(bs2.isCancelled()); + + s.onError(new IOException()); + + assertTrue(((Disposable)s).isDisposed()); + + s.onComplete(); + s.onNext(1); + s.onError(new TestException()); + } + })) + .test() + .assertFailure(IOException.class); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertUndeliverable(errors, 1, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java index 93a9e2fb74..5cf0410705 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java @@ -17,6 +17,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import java.io.IOException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -27,11 +28,12 @@ import io.reactivex.*; import io.reactivex.Observable; import io.reactivex.Observer; -import io.reactivex.disposables.Disposables; -import io.reactivex.exceptions.TestException; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.observers.*; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.*; import io.reactivex.subjects.PublishSubject; @@ -1500,4 +1502,298 @@ public void run() throws Exception { assertEquals(0, counter.get()); } + + @Test + @SuppressWarnings("unchecked") + public void boundaryOpenCloseDisposedOnComplete() { + PublishSubject pp0 = PublishSubject.create(); + + PublishSubject pp1 = PublishSubject.create(); + + PublishSubject pp2 = PublishSubject.create(); + + TestObserver> ts = pp0 + .buffer(pp1, Functions.justFunction(pp2)) + .test(); + + assertTrue(pp0.hasObservers()); + assertTrue(pp1.hasObservers()); + assertFalse(pp2.hasObservers()); + + pp1.onNext(1); + + assertTrue(pp1.hasObservers()); + assertTrue(pp2.hasObservers()); + + pp0.onComplete(); + + ts.assertResult(Collections.emptyList()); + + assertFalse(pp1.hasObservers()); + assertFalse(pp2.hasObservers()); + } + + @Test + public void bufferedCanCompleteIfOpenNeverCompletesDropping() { + Observable.range(1, 50) + .zipWith(Observable.interval(5, TimeUnit.MILLISECONDS), + new BiFunction() { + @Override + public Integer apply(Integer integer, Long aLong) { + return integer; + } + }) + .buffer(Observable.interval(0,200, TimeUnit.MILLISECONDS), + new Function>() { + @Override + public Observable apply(Long a) { + return Observable.just(a).delay(100, TimeUnit.MILLISECONDS); + } + }) + .test() + .assertSubscribed() + .awaitDone(3, TimeUnit.SECONDS) + .assertComplete(); + } + + @Test + public void bufferedCanCompleteIfOpenNeverCompletesOverlapping() { + Observable.range(1, 50) + .zipWith(Observable.interval(5, TimeUnit.MILLISECONDS), + new BiFunction() { + @Override + public Integer apply(Integer integer, Long aLong) { + return integer; + } + }) + .buffer(Observable.interval(0,100, TimeUnit.MILLISECONDS), + new Function>() { + @Override + public Observable apply(Long a) { + return Observable.just(a).delay(200, TimeUnit.MILLISECONDS); + } + }) + .test() + .assertSubscribed() + .awaitDone(3, TimeUnit.SECONDS) + .assertComplete(); + } + + @Test + @SuppressWarnings("unchecked") + public void openClosemainError() { + Observable.error(new TestException()) + .buffer(Observable.never(), Functions.justFunction(Observable.never())) + .test() + .assertFailure(TestException.class); + } + + @Test + @SuppressWarnings("unchecked") + public void openClosebadSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer s) { + Disposable bs1 = Disposables.empty(); + Disposable bs2 = Disposables.empty(); + + s.onSubscribe(bs1); + + assertFalse(bs1.isDisposed()); + assertFalse(bs2.isDisposed()); + + s.onSubscribe(bs2); + + assertFalse(bs1.isDisposed()); + assertTrue(bs2.isDisposed()); + + s.onError(new IOException()); + s.onComplete(); + s.onNext(1); + s.onError(new TestException()); + } + } + .buffer(Observable.never(), Functions.justFunction(Observable.never())) + .test() + .assertFailure(IOException.class); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertUndeliverable(errors, 1, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseOpenCompletes() { + PublishSubject pp0 = PublishSubject.create(); + + PublishSubject pp1 = PublishSubject.create(); + + PublishSubject pp2 = PublishSubject.create(); + + TestObserver> ts = pp0 + .buffer(pp1, Functions.justFunction(pp2)) + .test(); + + pp1.onNext(1); + + assertTrue(pp2.hasObservers()); + + pp1.onComplete(); + + assertTrue(pp0.hasObservers()); + assertTrue(pp2.hasObservers()); + + pp2.onComplete(); + + assertFalse(pp0.hasObservers()); + + ts.assertResult(Collections.emptyList()); + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseOpenCompletesNoBuffers() { + PublishSubject pp0 = PublishSubject.create(); + + PublishSubject pp1 = PublishSubject.create(); + + PublishSubject pp2 = PublishSubject.create(); + + TestObserver> ts = pp0 + .buffer(pp1, Functions.justFunction(pp2)) + .test(); + + pp1.onNext(1); + + assertTrue(pp2.hasObservers()); + + pp2.onComplete(); + + assertTrue(pp0.hasObservers()); + assertTrue(pp1.hasObservers()); + + pp1.onComplete(); + + assertFalse(pp0.hasObservers()); + + ts.assertResult(Collections.emptyList()); + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseTake() { + PublishSubject pp0 = PublishSubject.create(); + + PublishSubject pp1 = PublishSubject.create(); + + PublishSubject pp2 = PublishSubject.create(); + + TestObserver> ts = pp0 + .buffer(pp1, Functions.justFunction(pp2)) + .take(1) + .test(); + + pp1.onNext(1); + pp2.onComplete(); + + assertFalse(pp0.hasObservers()); + assertFalse(pp1.hasObservers()); + assertFalse(pp2.hasObservers()); + + ts.assertResult(Collections.emptyList()); + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseBadOpen() { + List errors = TestHelper.trackPluginErrors(); + try { + Observable.never() + .buffer(new Observable() { + @Override + protected void subscribeActual(Observer s) { + + assertFalse(((Disposable)s).isDisposed()); + + Disposable bs1 = Disposables.empty(); + Disposable bs2 = Disposables.empty(); + + s.onSubscribe(bs1); + + assertFalse(bs1.isDisposed()); + assertFalse(bs2.isDisposed()); + + s.onSubscribe(bs2); + + assertFalse(bs1.isDisposed()); + assertTrue(bs2.isDisposed()); + + s.onError(new IOException()); + + assertTrue(((Disposable)s).isDisposed()); + + s.onComplete(); + s.onNext(1); + s.onError(new TestException()); + } + }, Functions.justFunction(Observable.never())) + .test() + .assertFailure(IOException.class); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertUndeliverable(errors, 1, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseBadClose() { + List errors = TestHelper.trackPluginErrors(); + try { + Observable.never() + .buffer(Observable.just(1).concatWith(Observable.never()), + Functions.justFunction(new Observable() { + @Override + protected void subscribeActual(Observer s) { + + assertFalse(((Disposable)s).isDisposed()); + + Disposable bs1 = Disposables.empty(); + Disposable bs2 = Disposables.empty(); + + s.onSubscribe(bs1); + + assertFalse(bs1.isDisposed()); + assertFalse(bs2.isDisposed()); + + s.onSubscribe(bs2); + + assertFalse(bs1.isDisposed()); + assertTrue(bs2.isDisposed()); + + s.onError(new IOException()); + + assertTrue(((Disposable)s).isDisposed()); + + s.onComplete(); + s.onNext(1); + s.onError(new TestException()); + } + })) + .test() + .assertFailure(IOException.class); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertUndeliverable(errors, 1, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } From 4483c9ea110092d45228e2d92707d2eba72f1458 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 16 Jan 2018 17:14:15 +0100 Subject: [PATCH 2/3] Unify boundary error methods, fix cleanup, fix last buffer orders --- .../flowable/FlowableBufferBoundary.java | 27 +++++-------------- .../observable/ObservableBufferBoundary.java | 27 +++++-------------- 2 files changed, 12 insertions(+), 42 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java index 1396594fcc..5239d778f3 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java @@ -99,7 +99,7 @@ static final class BufferBoundarySubscriber, this.subscribers = new CompositeDisposable(); this.requested = new AtomicLong(); this.upstream = new AtomicReference(); - this.buffers = new HashMap(); + this.buffers = new LinkedHashMap(); this.errors = new AtomicThrowable(); } @@ -154,7 +154,7 @@ public void onComplete() { for (C b : bufs.values()) { queue.offer(b); } - bufs = null; + buffers = null; } done = true; drain(); @@ -217,21 +217,6 @@ void open(Open token) { p.subscribe(bc); } - void openError(BufferOpenSubscriber os, Throwable ex) { - SubscriptionHelper.cancel(upstream); - subscribers.delete(os); - if (errors.addThrowable(ex)) { - subscribers.dispose(); - synchronized (this) { - buffers = null; - } - done = true; - drain(); - } else { - RxJavaPlugins.onError(ex); - } - } - void openComplete(BufferOpenSubscriber os) { subscribers.delete(os); if (subscribers.size() == 0) { @@ -261,9 +246,9 @@ void close(BufferCloseSubscriber closer, long idx) { drain(); } - void closeError(BufferCloseSubscriber closer, Throwable ex) { + void boundaryError(Disposable subscriber, Throwable ex) { SubscriptionHelper.cancel(upstream); - subscribers.delete(closer); + subscribers.delete(subscriber); if (errors.addThrowable(ex)) { subscribers.dispose(); synchronized (this) { @@ -374,7 +359,7 @@ public void onNext(Open t) { @Override public void onError(Throwable t) { lazySet(SubscriptionHelper.CANCELLED); - parent.openError(this, t); + parent.boundaryError(this, t); } @Override @@ -431,7 +416,7 @@ public void onNext(Object t) { public void onError(Throwable t) { if (get() != SubscriptionHelper.CANCELLED) { lazySet(SubscriptionHelper.CANCELLED); - parent.closeError(this, t); + parent.boundaryError(this, t); } else { RxJavaPlugins.onError(t); } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java index a9dc182526..3a056baee8 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java @@ -93,7 +93,7 @@ static final class BufferBoundaryObserver, Op this.queue = new SpscLinkedArrayQueue(bufferSize()); this.observers = new CompositeDisposable(); this.upstream = new AtomicReference(); - this.buffers = new HashMap(); + this.buffers = new LinkedHashMap(); this.errors = new AtomicThrowable(); } @@ -146,7 +146,7 @@ public void onComplete() { for (C b : bufs.values()) { queue.offer(b); } - bufs = null; + buffers = null; } done = true; drain(); @@ -208,21 +208,6 @@ void open(Open token) { p.subscribe(bc); } - void openError(BufferOpenObserver os, Throwable ex) { - DisposableHelper.dispose(upstream); - observers.delete(os); - if (errors.addThrowable(ex)) { - observers.dispose(); - synchronized (this) { - buffers = null; - } - done = true; - drain(); - } else { - RxJavaPlugins.onError(ex); - } - } - void openComplete(BufferOpenObserver os) { observers.delete(os); if (observers.size() == 0) { @@ -252,9 +237,9 @@ void close(BufferCloseObserver closer, long idx) { drain(); } - void closeError(BufferCloseObserver closer, Throwable ex) { + void boundaryError(Disposable observer, Throwable ex) { DisposableHelper.dispose(upstream); - observers.delete(closer); + observers.delete(observer); if (errors.addThrowable(ex)) { observers.dispose(); synchronized (this) { @@ -338,7 +323,7 @@ public void onNext(Open t) { @Override public void onError(Throwable t) { lazySet(DisposableHelper.DISPOSED); - parent.openError(this, t); + parent.boundaryError(this, t); } @Override @@ -393,7 +378,7 @@ public void onNext(Object t) { public void onError(Throwable t) { if (get() != DisposableHelper.DISPOSED) { lazySet(DisposableHelper.DISPOSED); - parent.closeError(this, t); + parent.boundaryError(this, t); } else { RxJavaPlugins.onError(t); } From 04598ce5af7fb4e5be6ae9e5560d215ea8a34ad8 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 18 Jan 2018 09:46:43 +0100 Subject: [PATCH 3/3] Fix nits. --- .../flowable/FlowableBufferBoundary.java | 25 +-- .../observable/ObservableBufferBoundary.java | 22 +-- .../flowable/FlowableBufferTest.java | 148 +++++++++--------- .../observable/ObservableBufferTest.java | 96 ++++++------ 4 files changed, 127 insertions(+), 164 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java index 5239d778f3..acf35be0cd 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java @@ -189,16 +189,7 @@ void open(Open token) { } catch (Throwable ex) { Exceptions.throwIfFatal(ex); SubscriptionHelper.cancel(upstream); - if (errors.addThrowable(ex)) { - subscribers.dispose(); - synchronized (this) { - buffers = null; - } - done = true; - drain(); - } else { - RxJavaPlugins.onError(ex); - } + onError(ex); return; } @@ -249,16 +240,7 @@ void close(BufferCloseSubscriber closer, long idx) { void boundaryError(Disposable subscriber, Throwable ex) { SubscriptionHelper.cancel(upstream); subscribers.delete(subscriber); - if (errors.addThrowable(ex)) { - subscribers.dispose(); - synchronized (this) { - buffers = null; - } - done = true; - drain(); - } else { - RxJavaPlugins.onError(ex); - } + onError(ex); } void drain() { @@ -316,8 +298,7 @@ void drain() { Throwable ex = errors.terminate(); a.onError(ex); return; - } else - if (q.isEmpty()) { + } else if (q.isEmpty()) { a.onComplete(); return; } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java index 3a056baee8..b88bce3477 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java @@ -180,16 +180,7 @@ void open(Open token) { } catch (Throwable ex) { Exceptions.throwIfFatal(ex); DisposableHelper.dispose(upstream); - if (errors.addThrowable(ex)) { - observers.dispose(); - synchronized (this) { - buffers = null; - } - done = true; - drain(); - } else { - RxJavaPlugins.onError(ex); - } + onError(ex); return; } @@ -240,16 +231,7 @@ void close(BufferCloseObserver closer, long idx) { void boundaryError(Disposable observer, Throwable ex) { DisposableHelper.dispose(upstream); observers.delete(observer); - if (errors.addThrowable(ex)) { - observers.dispose(); - synchronized (this) { - buffers = null; - } - done = true; - drain(); - } else { - RxJavaPlugins.onError(ex); - } + onError(ex); } void drain() { diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java index bbab7b80a1..06a4553f1c 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java @@ -2080,31 +2080,31 @@ public void run() throws Exception { @Test @SuppressWarnings("unchecked") public void boundaryOpenCloseDisposedOnComplete() { - PublishProcessor pp0 = PublishProcessor.create(); + PublishProcessor source = PublishProcessor.create(); - PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor openIndicator = PublishProcessor.create(); - PublishProcessor pp2 = PublishProcessor.create(); + PublishProcessor closeIndicator = PublishProcessor.create(); - TestSubscriber> ts = pp0 - .buffer(pp1, Functions.justFunction(pp2)) + TestSubscriber> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) .test(); - assertTrue(pp0.hasSubscribers()); - assertTrue(pp1.hasSubscribers()); - assertFalse(pp2.hasSubscribers()); + assertTrue(source.hasSubscribers()); + assertTrue(openIndicator.hasSubscribers()); + assertFalse(closeIndicator.hasSubscribers()); - pp1.onNext(1); + openIndicator.onNext(1); - assertTrue(pp1.hasSubscribers()); - assertTrue(pp2.hasSubscribers()); + assertTrue(openIndicator.hasSubscribers()); + assertTrue(closeIndicator.hasSubscribers()); - pp0.onComplete(); + source.onComplete(); ts.assertResult(Collections.emptyList()); - assertFalse(pp1.hasSubscribers()); - assertFalse(pp2.hasSubscribers()); + assertFalse(openIndicator.hasSubscribers()); + assertFalse(closeIndicator.hasSubscribers()); } @Test @@ -2203,28 +2203,28 @@ protected void subscribeActual(Subscriber s) { @Test @SuppressWarnings("unchecked") public void openCloseOpenCompletes() { - PublishProcessor pp0 = PublishProcessor.create(); + PublishProcessor source = PublishProcessor.create(); - PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor openIndicator = PublishProcessor.create(); - PublishProcessor pp2 = PublishProcessor.create(); + PublishProcessor closeIndicator = PublishProcessor.create(); - TestSubscriber> ts = pp0 - .buffer(pp1, Functions.justFunction(pp2)) + TestSubscriber> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) .test(); - pp1.onNext(1); + openIndicator.onNext(1); - assertTrue(pp2.hasSubscribers()); + assertTrue(closeIndicator.hasSubscribers()); - pp1.onComplete(); + openIndicator.onComplete(); - assertTrue(pp0.hasSubscribers()); - assertTrue(pp2.hasSubscribers()); + assertTrue(source.hasSubscribers()); + assertTrue(closeIndicator.hasSubscribers()); - pp2.onComplete(); + closeIndicator.onComplete(); - assertFalse(pp0.hasSubscribers()); + assertFalse(source.hasSubscribers()); ts.assertResult(Collections.emptyList()); } @@ -2232,28 +2232,28 @@ public void openCloseOpenCompletes() { @Test @SuppressWarnings("unchecked") public void openCloseOpenCompletesNoBuffers() { - PublishProcessor pp0 = PublishProcessor.create(); + PublishProcessor source = PublishProcessor.create(); - PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor openIndicator = PublishProcessor.create(); - PublishProcessor pp2 = PublishProcessor.create(); + PublishProcessor closeIndicator = PublishProcessor.create(); - TestSubscriber> ts = pp0 - .buffer(pp1, Functions.justFunction(pp2)) + TestSubscriber> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) .test(); - pp1.onNext(1); + openIndicator.onNext(1); - assertTrue(pp2.hasSubscribers()); + assertTrue(closeIndicator.hasSubscribers()); - pp2.onComplete(); + closeIndicator.onComplete(); - assertTrue(pp0.hasSubscribers()); - assertTrue(pp1.hasSubscribers()); + assertTrue(source.hasSubscribers()); + assertTrue(openIndicator.hasSubscribers()); - pp1.onComplete(); + openIndicator.onComplete(); - assertFalse(pp0.hasSubscribers()); + assertFalse(source.hasSubscribers()); ts.assertResult(Collections.emptyList()); } @@ -2261,23 +2261,23 @@ public void openCloseOpenCompletesNoBuffers() { @Test @SuppressWarnings("unchecked") public void openCloseTake() { - PublishProcessor pp0 = PublishProcessor.create(); + PublishProcessor source = PublishProcessor.create(); - PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor openIndicator = PublishProcessor.create(); - PublishProcessor pp2 = PublishProcessor.create(); + PublishProcessor closeIndicator = PublishProcessor.create(); - TestSubscriber> ts = pp0 - .buffer(pp1, Functions.justFunction(pp2)) + TestSubscriber> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) .take(1) .test(2); - pp1.onNext(1); - pp2.onComplete(); + openIndicator.onNext(1); + closeIndicator.onComplete(); - assertFalse(pp0.hasSubscribers()); - assertFalse(pp1.hasSubscribers()); - assertFalse(pp2.hasSubscribers()); + assertFalse(source.hasSubscribers()); + assertFalse(openIndicator.hasSubscribers()); + assertFalse(closeIndicator.hasSubscribers()); ts.assertResult(Collections.emptyList()); } @@ -2285,23 +2285,23 @@ public void openCloseTake() { @Test @SuppressWarnings("unchecked") public void openCloseLimit() { - PublishProcessor pp0 = PublishProcessor.create(); + PublishProcessor source = PublishProcessor.create(); - PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor openIndicator = PublishProcessor.create(); - PublishProcessor pp2 = PublishProcessor.create(); + PublishProcessor closeIndicator = PublishProcessor.create(); - TestSubscriber> ts = pp0 - .buffer(pp1, Functions.justFunction(pp2)) + TestSubscriber> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) .limit(1) .test(2); - pp1.onNext(1); - pp2.onComplete(); + openIndicator.onNext(1); + closeIndicator.onComplete(); - assertFalse(pp0.hasSubscribers()); - assertFalse(pp1.hasSubscribers()); - assertFalse(pp2.hasSubscribers()); + assertFalse(source.hasSubscribers()); + assertFalse(openIndicator.hasSubscribers()); + assertFalse(closeIndicator.hasSubscribers()); ts.assertResult(Collections.emptyList()); } @@ -2309,20 +2309,20 @@ public void openCloseLimit() { @Test @SuppressWarnings("unchecked") public void openCloseEmptyBackpressure() { - PublishProcessor pp0 = PublishProcessor.create(); + PublishProcessor source = PublishProcessor.create(); - PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor openIndicator = PublishProcessor.create(); - PublishProcessor pp2 = PublishProcessor.create(); + PublishProcessor closeIndicator = PublishProcessor.create(); - TestSubscriber> ts = pp0 - .buffer(pp1, Functions.justFunction(pp2)) + TestSubscriber> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) .test(0); - pp0.onComplete(); + source.onComplete(); - assertFalse(pp1.hasSubscribers()); - assertFalse(pp2.hasSubscribers()); + assertFalse(openIndicator.hasSubscribers()); + assertFalse(closeIndicator.hasSubscribers()); ts.assertResult(); } @@ -2330,20 +2330,20 @@ public void openCloseEmptyBackpressure() { @Test @SuppressWarnings("unchecked") public void openCloseErrorBackpressure() { - PublishProcessor pp0 = PublishProcessor.create(); + PublishProcessor source = PublishProcessor.create(); - PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor openIndicator = PublishProcessor.create(); - PublishProcessor pp2 = PublishProcessor.create(); + PublishProcessor closeIndicator = PublishProcessor.create(); - TestSubscriber> ts = pp0 - .buffer(pp1, Functions.justFunction(pp2)) + TestSubscriber> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) .test(0); - pp0.onError(new TestException()); + source.onError(new TestException()); - assertFalse(pp1.hasSubscribers()); - assertFalse(pp2.hasSubscribers()); + assertFalse(openIndicator.hasSubscribers()); + assertFalse(closeIndicator.hasSubscribers()); ts.assertFailure(TestException.class); } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java index 5cf0410705..6c633581ea 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java @@ -1506,31 +1506,31 @@ public void run() throws Exception { @Test @SuppressWarnings("unchecked") public void boundaryOpenCloseDisposedOnComplete() { - PublishSubject pp0 = PublishSubject.create(); + PublishSubject source = PublishSubject.create(); - PublishSubject pp1 = PublishSubject.create(); + PublishSubject openIndicator = PublishSubject.create(); - PublishSubject pp2 = PublishSubject.create(); + PublishSubject closeIndicator = PublishSubject.create(); - TestObserver> ts = pp0 - .buffer(pp1, Functions.justFunction(pp2)) + TestObserver> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) .test(); - assertTrue(pp0.hasObservers()); - assertTrue(pp1.hasObservers()); - assertFalse(pp2.hasObservers()); + assertTrue(source.hasObservers()); + assertTrue(openIndicator.hasObservers()); + assertFalse(closeIndicator.hasObservers()); - pp1.onNext(1); + openIndicator.onNext(1); - assertTrue(pp1.hasObservers()); - assertTrue(pp2.hasObservers()); + assertTrue(openIndicator.hasObservers()); + assertTrue(closeIndicator.hasObservers()); - pp0.onComplete(); + source.onComplete(); ts.assertResult(Collections.emptyList()); - assertFalse(pp1.hasObservers()); - assertFalse(pp2.hasObservers()); + assertFalse(openIndicator.hasObservers()); + assertFalse(closeIndicator.hasObservers()); } @Test @@ -1629,28 +1629,28 @@ protected void subscribeActual(Observer s) { @Test @SuppressWarnings("unchecked") public void openCloseOpenCompletes() { - PublishSubject pp0 = PublishSubject.create(); + PublishSubject source = PublishSubject.create(); - PublishSubject pp1 = PublishSubject.create(); + PublishSubject openIndicator = PublishSubject.create(); - PublishSubject pp2 = PublishSubject.create(); + PublishSubject closeIndicator = PublishSubject.create(); - TestObserver> ts = pp0 - .buffer(pp1, Functions.justFunction(pp2)) + TestObserver> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) .test(); - pp1.onNext(1); + openIndicator.onNext(1); - assertTrue(pp2.hasObservers()); + assertTrue(closeIndicator.hasObservers()); - pp1.onComplete(); + openIndicator.onComplete(); - assertTrue(pp0.hasObservers()); - assertTrue(pp2.hasObservers()); + assertTrue(source.hasObservers()); + assertTrue(closeIndicator.hasObservers()); - pp2.onComplete(); + closeIndicator.onComplete(); - assertFalse(pp0.hasObservers()); + assertFalse(source.hasObservers()); ts.assertResult(Collections.emptyList()); } @@ -1658,28 +1658,28 @@ public void openCloseOpenCompletes() { @Test @SuppressWarnings("unchecked") public void openCloseOpenCompletesNoBuffers() { - PublishSubject pp0 = PublishSubject.create(); + PublishSubject source = PublishSubject.create(); - PublishSubject pp1 = PublishSubject.create(); + PublishSubject openIndicator = PublishSubject.create(); - PublishSubject pp2 = PublishSubject.create(); + PublishSubject closeIndicator = PublishSubject.create(); - TestObserver> ts = pp0 - .buffer(pp1, Functions.justFunction(pp2)) + TestObserver> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) .test(); - pp1.onNext(1); + openIndicator.onNext(1); - assertTrue(pp2.hasObservers()); + assertTrue(closeIndicator.hasObservers()); - pp2.onComplete(); + closeIndicator.onComplete(); - assertTrue(pp0.hasObservers()); - assertTrue(pp1.hasObservers()); + assertTrue(source.hasObservers()); + assertTrue(openIndicator.hasObservers()); - pp1.onComplete(); + openIndicator.onComplete(); - assertFalse(pp0.hasObservers()); + assertFalse(source.hasObservers()); ts.assertResult(Collections.emptyList()); } @@ -1687,23 +1687,23 @@ public void openCloseOpenCompletesNoBuffers() { @Test @SuppressWarnings("unchecked") public void openCloseTake() { - PublishSubject pp0 = PublishSubject.create(); + PublishSubject source = PublishSubject.create(); - PublishSubject pp1 = PublishSubject.create(); + PublishSubject openIndicator = PublishSubject.create(); - PublishSubject pp2 = PublishSubject.create(); + PublishSubject closeIndicator = PublishSubject.create(); - TestObserver> ts = pp0 - .buffer(pp1, Functions.justFunction(pp2)) + TestObserver> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) .take(1) .test(); - pp1.onNext(1); - pp2.onComplete(); + openIndicator.onNext(1); + closeIndicator.onComplete(); - assertFalse(pp0.hasObservers()); - assertFalse(pp1.hasObservers()); - assertFalse(pp2.hasObservers()); + assertFalse(source.hasObservers()); + assertFalse(openIndicator.hasObservers()); + assertFalse(closeIndicator.hasObservers()); ts.assertResult(Collections.emptyList()); }