diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java index e375d5f720..acf35be0cd 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java @@ -15,22 +15,19 @@ import java.util.*; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import org.reactivestreams.*; -import io.reactivex.Flowable; +import io.reactivex.*; import io.reactivex.disposables.*; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.fuseable.SimplePlainQueue; -import io.reactivex.internal.queue.MpscLinkedQueue; -import io.reactivex.internal.subscribers.QueueDrainSubscriber; +import io.reactivex.internal.queue.SpscLinkedArrayQueue; import io.reactivex.internal.subscriptions.SubscriptionHelper; -import io.reactivex.internal.util.QueueDrainHelper; +import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.subscribers.*; public final class FlowableBufferBoundary, Open, Close> extends AbstractFlowableWithUpstream { @@ -48,48 +45,72 @@ public FlowableBufferBoundary(Flowable source, Publisher buff @Override protected void subscribeActual(Subscriber s) { - source.subscribe(new BufferBoundarySubscriber( - new SerializedSubscriber(s), - bufferOpen, bufferClose, bufferSupplier - )); + BufferBoundarySubscriber parent = + new BufferBoundarySubscriber( + s, bufferOpen, bufferClose, bufferSupplier + ); + s.onSubscribe(parent); + source.subscribe(parent); } - static final class BufferBoundarySubscriber, Open, Close> - extends QueueDrainSubscriber implements Subscription, Disposable { + static final class BufferBoundarySubscriber, Open, Close> + extends AtomicInteger implements FlowableSubscriber, Subscription { + + private static final long serialVersionUID = -8466418554264089604L; + + final Subscriber actual; + + final Callable bufferSupplier; + final Publisher bufferOpen; + final Function> bufferClose; - final Callable bufferSupplier; - final CompositeDisposable resources; - Subscription s; + final CompositeDisposable subscribers; + + final AtomicLong requested; + + final AtomicReference upstream; - final List buffers; + final AtomicThrowable errors; - final AtomicInteger windows = new AtomicInteger(); + volatile boolean done; - BufferBoundarySubscriber(Subscriber actual, + final SpscLinkedArrayQueue queue; + + volatile boolean cancelled; + + long index; + + Map buffers; + + long emitted; + + BufferBoundarySubscriber(Subscriber actual, Publisher bufferOpen, Function> bufferClose, - Callable bufferSupplier) { - super(actual, new MpscLinkedQueue()); + Callable bufferSupplier + ) { + this.actual = actual; + this.bufferSupplier = bufferSupplier; this.bufferOpen = bufferOpen; this.bufferClose = bufferClose; - this.bufferSupplier = bufferSupplier; - this.buffers = new LinkedList(); - this.resources = new CompositeDisposable(); + this.queue = new SpscLinkedArrayQueue(bufferSize()); + this.subscribers = new CompositeDisposable(); + this.requested = new AtomicLong(); + this.upstream = new AtomicReference(); + this.buffers = new LinkedHashMap(); + this.errors = new AtomicThrowable(); } + @Override public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.s, s)) { - this.s = s; - - BufferOpenSubscriber bos = new BufferOpenSubscriber(this); - resources.add(bos); + if (SubscriptionHelper.setOnce(this.upstream, s)) { - actual.onSubscribe(this); + BufferOpenSubscriber open = new BufferOpenSubscriber(this); + subscribers.add(open); - windows.lazySet(1); - bufferOpen.subscribe(bos); + bufferOpen.subscribe(open); s.request(Long.MAX_VALUE); } @@ -98,7 +119,11 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { synchronized (this) { - for (U b : buffers) { + Map bufs = buffers; + if (bufs == null) { + return; + } + for (C b : bufs.values()) { b.add(t); } } @@ -106,206 +131,294 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - cancel(); - cancelled = true; - synchronized (this) { - buffers.clear(); + if (errors.addThrowable(t)) { + subscribers.dispose(); + synchronized (this) { + buffers = null; + } + done = true; + drain(); + } else { + RxJavaPlugins.onError(t); } - actual.onError(t); } @Override public void onComplete() { - if (windows.decrementAndGet() == 0) { - complete(); - } - } - - void complete() { - List list; + subscribers.dispose(); synchronized (this) { - list = new ArrayList(buffers); - buffers.clear(); - } - - SimplePlainQueue q = queue; - for (U u : list) { - q.offer(u); + Map bufs = buffers; + if (bufs == null) { + return; + } + for (C b : bufs.values()) { + queue.offer(b); + } + buffers = null; } done = true; - if (enter()) { - QueueDrainHelper.drainMaxLoop(q, actual, false, this, this); - } + drain(); } @Override public void request(long n) { - requested(n); - } - - @Override - public void dispose() { - resources.dispose(); - } - - @Override - public boolean isDisposed() { - return resources.isDisposed(); + BackpressureHelper.add(requested, n); + drain(); } @Override public void cancel() { - if (!cancelled) { + if (SubscriptionHelper.cancel(upstream)) { cancelled = true; - dispose(); + subscribers.dispose(); + synchronized (this) { + buffers = null; + } + if (getAndIncrement() != 0) { + queue.clear(); + } } } - @Override - public boolean accept(Subscriber a, U v) { - a.onNext(v); - return true; - } - - void open(Open window) { - if (cancelled) { + void open(Open token) { + Publisher p; + C buf; + try { + buf = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null Collection"); + p = ObjectHelper.requireNonNull(bufferClose.apply(token), "The bufferClose returned a null Publisher"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + SubscriptionHelper.cancel(upstream); + onError(ex); return; } - U b; - - try { - b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - onError(e); - return; + long idx = index; + index = idx + 1; + synchronized (this) { + Map bufs = buffers; + if (bufs == null) { + return; + } + bufs.put(idx, buf); } - Publisher p; + BufferCloseSubscriber bc = new BufferCloseSubscriber(this, idx); + subscribers.add(bc); + p.subscribe(bc); + } - try { - p = ObjectHelper.requireNonNull(bufferClose.apply(window), "The buffer closing publisher is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - onError(e); - return; + void openComplete(BufferOpenSubscriber os) { + subscribers.delete(os); + if (subscribers.size() == 0) { + SubscriptionHelper.cancel(upstream); + done = true; + drain(); } + } - if (cancelled) { - return; + void close(BufferCloseSubscriber closer, long idx) { + subscribers.delete(closer); + boolean makeDone = false; + if (subscribers.size() == 0) { + makeDone = true; + SubscriptionHelper.cancel(upstream); } - synchronized (this) { - if (cancelled) { + Map bufs = buffers; + if (bufs == null) { return; } - buffers.add(b); + queue.offer(buffers.remove(idx)); } + if (makeDone) { + done = true; + } + drain(); + } - BufferCloseSubscriber bcs = new BufferCloseSubscriber(b, this); - resources.add(bcs); + void boundaryError(Disposable subscriber, Throwable ex) { + SubscriptionHelper.cancel(upstream); + subscribers.delete(subscriber); + onError(ex); + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } - windows.getAndIncrement(); + int missed = 1; + long e = emitted; + Subscriber a = actual; + SpscLinkedArrayQueue q = queue; + + for (;;) { + long r = requested.get(); + + while (e != r) { + if (cancelled) { + q.clear(); + return; + } + + boolean d = done; + if (d && errors.get() != null) { + q.clear(); + Throwable ex = errors.terminate(); + a.onError(ex); + return; + } + + C v = q.poll(); + boolean empty = v == null; + + if (d && empty) { + a.onComplete(); + return; + } + + if (empty) { + break; + } + + a.onNext(v); + e++; + } - p.subscribe(bcs); - } + if (e == r) { + if (cancelled) { + q.clear(); + return; + } + + if (done) { + if (errors.get() != null) { + q.clear(); + Throwable ex = errors.terminate(); + a.onError(ex); + return; + } else if (q.isEmpty()) { + a.onComplete(); + return; + } + } + } - void openFinished(Disposable d) { - if (resources.remove(d)) { - if (windows.decrementAndGet() == 0) { - complete(); + emitted = e; + missed = addAndGet(-missed); + if (missed == 0) { + break; } } } - void close(U b, Disposable d) { + static final class BufferOpenSubscriber + extends AtomicReference + implements FlowableSubscriber, Disposable { - boolean e; - synchronized (this) { - e = buffers.remove(b); - } + private static final long serialVersionUID = -8498650778633225126L; + + final BufferBoundarySubscriber parent; - if (e) { - fastPathOrderedEmitMax(b, false, this); + BufferOpenSubscriber(BufferBoundarySubscriber parent) { + this.parent = parent; } - if (resources.remove(d)) { - if (windows.decrementAndGet() == 0) { - complete(); + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.setOnce(this, s)) { + s.request(Long.MAX_VALUE); } } - } - } - static final class BufferOpenSubscriber, Open, Close> - extends DisposableSubscriber { - final BufferBoundarySubscriber parent; + @Override + public void onNext(Open t) { + parent.open(t); + } - boolean done; + @Override + public void onError(Throwable t) { + lazySet(SubscriptionHelper.CANCELLED); + parent.boundaryError(this, t); + } - BufferOpenSubscriber(BufferBoundarySubscriber parent) { - this.parent = parent; - } - @Override - public void onNext(Open t) { - if (done) { - return; + @Override + public void onComplete() { + lazySet(SubscriptionHelper.CANCELLED); + parent.openComplete(this); } - parent.open(t); - } - @Override - public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; + @Override + public void dispose() { + SubscriptionHelper.cancel(this); } - done = true; - parent.onError(t); - } - @Override - public void onComplete() { - if (done) { - return; + @Override + public boolean isDisposed() { + return get() == SubscriptionHelper.CANCELLED; } - done = true; - parent.openFinished(this); } } - static final class BufferCloseSubscriber, Open, Close> - extends DisposableSubscriber { - final BufferBoundarySubscriber parent; - final U value; - boolean done; - BufferCloseSubscriber(U value, BufferBoundarySubscriber parent) { + static final class BufferCloseSubscriber> + extends AtomicReference + implements FlowableSubscriber, Disposable { + + private static final long serialVersionUID = -8498650778633225126L; + + final BufferBoundarySubscriber parent; + + final long index; + + BufferCloseSubscriber(BufferBoundarySubscriber parent, long index) { this.parent = parent; - this.value = value; + this.index = index; } @Override - public void onNext(Close t) { - onComplete(); + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.setOnce(this, s)) { + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(Object t) { + Subscription s = get(); + if (s != SubscriptionHelper.CANCELLED) { + lazySet(SubscriptionHelper.CANCELLED); + s.cancel(); + parent.close(this, index); + } } @Override public void onError(Throwable t) { - if (done) { + if (get() != SubscriptionHelper.CANCELLED) { + lazySet(SubscriptionHelper.CANCELLED); + parent.boundaryError(this, t); + } else { RxJavaPlugins.onError(t); - return; } - parent.onError(t); } @Override public void onComplete() { - if (done) { - return; + if (get() != SubscriptionHelper.CANCELLED) { + lazySet(SubscriptionHelper.CANCELLED); + parent.close(this, index); } - done = true; - parent.close(value, this); + } + + @Override + public void dispose() { + SubscriptionHelper.cancel(this); + } + + @Override + public boolean isDisposed() { + return get() == SubscriptionHelper.CANCELLED; } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java index a9051e474a..b88bce3477 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java @@ -15,7 +15,7 @@ import java.util.*; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import io.reactivex.ObservableSource; import io.reactivex.Observer; @@ -24,11 +24,8 @@ import io.reactivex.functions.Function; import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.fuseable.SimplePlainQueue; -import io.reactivex.internal.observers.QueueDrainObserver; -import io.reactivex.internal.queue.MpscLinkedQueue; -import io.reactivex.internal.util.QueueDrainHelper; -import io.reactivex.observers.*; +import io.reactivex.internal.queue.SpscLinkedArrayQueue; +import io.reactivex.internal.util.AtomicThrowable; import io.reactivex.plugins.RxJavaPlugins; public final class ObservableBufferBoundary, Open, Close> @@ -47,55 +44,78 @@ public ObservableBufferBoundary(ObservableSource source, ObservableSource t) { - source.subscribe(new BufferBoundaryObserver( - new SerializedObserver(t), - bufferOpen, bufferClose, bufferSupplier - )); + BufferBoundaryObserver parent = + new BufferBoundaryObserver( + t, bufferOpen, bufferClose, bufferSupplier + ); + t.onSubscribe(parent); + source.subscribe(parent); } - static final class BufferBoundaryObserver, Open, Close> - extends QueueDrainObserver implements Disposable { + static final class BufferBoundaryObserver, Open, Close> + extends AtomicInteger implements Observer, Disposable { + + private static final long serialVersionUID = -8466418554264089604L; + + final Observer actual; + + final Callable bufferSupplier; + final ObservableSource bufferOpen; + final Function> bufferClose; - final Callable bufferSupplier; - final CompositeDisposable resources; - Disposable s; + final CompositeDisposable observers; + + final AtomicReference upstream; - final List buffers; + final AtomicThrowable errors; - final AtomicInteger windows = new AtomicInteger(); + volatile boolean done; - BufferBoundaryObserver(Observer actual, + final SpscLinkedArrayQueue queue; + + volatile boolean cancelled; + + long index; + + Map buffers; + + BufferBoundaryObserver(Observer actual, ObservableSource bufferOpen, Function> bufferClose, - Callable bufferSupplier) { - super(actual, new MpscLinkedQueue()); + Callable bufferSupplier + ) { + this.actual = actual; + this.bufferSupplier = bufferSupplier; this.bufferOpen = bufferOpen; this.bufferClose = bufferClose; - this.bufferSupplier = bufferSupplier; - this.buffers = new LinkedList(); - this.resources = new CompositeDisposable(); + this.queue = new SpscLinkedArrayQueue(bufferSize()); + this.observers = new CompositeDisposable(); + this.upstream = new AtomicReference(); + this.buffers = new LinkedHashMap(); + this.errors = new AtomicThrowable(); } + @Override public void onSubscribe(Disposable s) { - if (DisposableHelper.validate(this.s, s)) { - this.s = s; - - BufferOpenObserver bos = new BufferOpenObserver(this); - resources.add(bos); + if (DisposableHelper.setOnce(this.upstream, s)) { - actual.onSubscribe(this); + BufferOpenObserver open = new BufferOpenObserver(this); + observers.add(open); - windows.lazySet(1); - bufferOpen.subscribe(bos); + bufferOpen.subscribe(open); } } @Override public void onNext(T t) { synchronized (this) { - for (U b : buffers) { + Map bufs = buffers; + if (bufs == null) { + return; + } + for (C b : bufs.values()) { b.add(t); } } @@ -103,194 +123,265 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - dispose(); - cancelled = true; - synchronized (this) { - buffers.clear(); + if (errors.addThrowable(t)) { + observers.dispose(); + synchronized (this) { + buffers = null; + } + done = true; + drain(); + } else { + RxJavaPlugins.onError(t); } - actual.onError(t); } @Override public void onComplete() { - if (windows.decrementAndGet() == 0) { - complete(); - } - } - - void complete() { - List list; + observers.dispose(); synchronized (this) { - list = new ArrayList(buffers); - buffers.clear(); - } - - SimplePlainQueue q = queue; - for (U u : list) { - q.offer(u); + Map bufs = buffers; + if (bufs == null) { + return; + } + for (C b : bufs.values()) { + queue.offer(b); + } + buffers = null; } done = true; - if (enter()) { - QueueDrainHelper.drainLoop(q, actual, false, this, this); - } + drain(); } @Override public void dispose() { - if (!cancelled) { + if (DisposableHelper.dispose(upstream)) { cancelled = true; - resources.dispose(); + observers.dispose(); + synchronized (this) { + buffers = null; + } + if (getAndIncrement() != 0) { + queue.clear(); + } } } - @Override public boolean isDisposed() { - return cancelled; - } - @Override - public void accept(Observer a, U v) { - a.onNext(v); + public boolean isDisposed() { + return DisposableHelper.isDisposed(upstream.get()); } - void open(Open window) { - if (cancelled) { + void open(Open token) { + ObservableSource p; + C buf; + try { + buf = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null Collection"); + p = ObjectHelper.requireNonNull(bufferClose.apply(token), "The bufferClose returned a null ObservableSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + DisposableHelper.dispose(upstream); + onError(ex); return; } - U b; - - try { - b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - onError(e); - return; + long idx = index; + index = idx + 1; + synchronized (this) { + Map bufs = buffers; + if (bufs == null) { + return; + } + bufs.put(idx, buf); } - ObservableSource p; + BufferCloseObserver bc = new BufferCloseObserver(this, idx); + observers.add(bc); + p.subscribe(bc); + } - try { - p = ObjectHelper.requireNonNull(bufferClose.apply(window), "The buffer closing Observable is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - onError(e); - return; + void openComplete(BufferOpenObserver os) { + observers.delete(os); + if (observers.size() == 0) { + DisposableHelper.dispose(upstream); + done = true; + drain(); } + } - if (cancelled) { - return; + void close(BufferCloseObserver closer, long idx) { + observers.delete(closer); + boolean makeDone = false; + if (observers.size() == 0) { + makeDone = true; + DisposableHelper.dispose(upstream); } - synchronized (this) { - if (cancelled) { + Map bufs = buffers; + if (bufs == null) { return; } - buffers.add(b); + queue.offer(buffers.remove(idx)); } + if (makeDone) { + done = true; + } + drain(); + } - BufferCloseObserver bcs = new BufferCloseObserver(b, this); - resources.add(bcs); + void boundaryError(Disposable observer, Throwable ex) { + DisposableHelper.dispose(upstream); + observers.delete(observer); + onError(ex); + } - windows.getAndIncrement(); + void drain() { + if (getAndIncrement() != 0) { + return; + } - p.subscribe(bcs); - } + int missed = 1; + Observer a = actual; + SpscLinkedArrayQueue q = queue; + + for (;;) { + for (;;) { + if (cancelled) { + q.clear(); + return; + } + + boolean d = done; + if (d && errors.get() != null) { + q.clear(); + Throwable ex = errors.terminate(); + a.onError(ex); + return; + } + + C v = q.poll(); + boolean empty = v == null; + + if (d && empty) { + a.onComplete(); + return; + } + + if (empty) { + break; + } + + a.onNext(v); + } - void openFinished(Disposable d) { - if (resources.remove(d)) { - if (windows.decrementAndGet() == 0) { - complete(); + missed = addAndGet(-missed); + if (missed == 0) { + break; } } } - void close(U b, Disposable d) { + static final class BufferOpenObserver + extends AtomicReference + implements Observer, Disposable { - boolean e; - synchronized (this) { - e = buffers.remove(b); + private static final long serialVersionUID = -8498650778633225126L; + + final BufferBoundaryObserver parent; + + BufferOpenObserver(BufferBoundaryObserver parent) { + this.parent = parent; } - if (e) { - fastPathOrderedEmit(b, false, this); + @Override + public void onSubscribe(Disposable s) { + DisposableHelper.setOnce(this, s); } - if (resources.remove(d)) { - if (windows.decrementAndGet() == 0) { - complete(); - } + @Override + public void onNext(Open t) { + parent.open(t); + } + + @Override + public void onError(Throwable t) { + lazySet(DisposableHelper.DISPOSED); + parent.boundaryError(this, t); + } + + @Override + public void onComplete() { + lazySet(DisposableHelper.DISPOSED); + parent.openComplete(this); + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return get() == DisposableHelper.DISPOSED; } } } - static final class BufferOpenObserver, Open, Close> - extends DisposableObserver { - final BufferBoundaryObserver parent; + static final class BufferCloseObserver> + extends AtomicReference + implements Observer, Disposable { + + private static final long serialVersionUID = -8498650778633225126L; + + final BufferBoundaryObserver parent; - boolean done; + final long index; - BufferOpenObserver(BufferBoundaryObserver parent) { + BufferCloseObserver(BufferBoundaryObserver parent, long index) { this.parent = parent; + this.index = index; } + @Override - public void onNext(Open t) { - if (done) { - return; + public void onSubscribe(Disposable s) { + DisposableHelper.setOnce(this, s); + } + + @Override + public void onNext(Object t) { + Disposable s = get(); + if (s != DisposableHelper.DISPOSED) { + lazySet(DisposableHelper.DISPOSED); + s.dispose(); + parent.close(this, index); } - parent.open(t); } @Override public void onError(Throwable t) { - if (done) { + if (get() != DisposableHelper.DISPOSED) { + lazySet(DisposableHelper.DISPOSED); + parent.boundaryError(this, t); + } else { RxJavaPlugins.onError(t); - return; } - done = true; - parent.onError(t); } @Override public void onComplete() { - if (done) { - return; + if (get() != DisposableHelper.DISPOSED) { + lazySet(DisposableHelper.DISPOSED); + parent.close(this, index); } - done = true; - parent.openFinished(this); - } - } - - static final class BufferCloseObserver, Open, Close> - extends DisposableObserver { - final BufferBoundaryObserver parent; - final U value; - boolean done; - BufferCloseObserver(U value, BufferBoundaryObserver parent) { - this.parent = parent; - this.value = value; } @Override - public void onNext(Close t) { - onComplete(); - } - - @Override - public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; - } - parent.onError(t); + public void dispose() { + DisposableHelper.dispose(this); } @Override - public void onComplete() { - if (done) { - return; - } - done = true; - parent.close(value, this); + public boolean isDisposed() { + return get() == DisposableHelper.DISPOSED; } } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java index bd18c0187d..06a4553f1c 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java @@ -17,6 +17,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import java.io.IOException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -26,7 +27,8 @@ import org.reactivestreams.*; import io.reactivex.*; -import io.reactivex.exceptions.TestException; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; @@ -2074,4 +2076,364 @@ public void run() throws Exception { assertEquals(0, counter.get()); } + + @Test + @SuppressWarnings("unchecked") + public void boundaryOpenCloseDisposedOnComplete() { + PublishProcessor source = PublishProcessor.create(); + + PublishProcessor openIndicator = PublishProcessor.create(); + + PublishProcessor closeIndicator = PublishProcessor.create(); + + TestSubscriber> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) + .test(); + + assertTrue(source.hasSubscribers()); + assertTrue(openIndicator.hasSubscribers()); + assertFalse(closeIndicator.hasSubscribers()); + + openIndicator.onNext(1); + + assertTrue(openIndicator.hasSubscribers()); + assertTrue(closeIndicator.hasSubscribers()); + + source.onComplete(); + + ts.assertResult(Collections.emptyList()); + + assertFalse(openIndicator.hasSubscribers()); + assertFalse(closeIndicator.hasSubscribers()); + } + + @Test + public void bufferedCanCompleteIfOpenNeverCompletesDropping() { + Flowable.range(1, 50) + .zipWith(Flowable.interval(5, TimeUnit.MILLISECONDS), + new BiFunction() { + @Override + public Integer apply(Integer integer, Long aLong) { + return integer; + } + }) + .buffer(Flowable.interval(0,200, TimeUnit.MILLISECONDS), + new Function>() { + @Override + public Publisher apply(Long a) { + return Flowable.just(a).delay(100, TimeUnit.MILLISECONDS); + } + }) + .test() + .assertSubscribed() + .awaitDone(3, TimeUnit.SECONDS) + .assertComplete(); + } + + @Test + public void bufferedCanCompleteIfOpenNeverCompletesOverlapping() { + Flowable.range(1, 50) + .zipWith(Flowable.interval(5, TimeUnit.MILLISECONDS), + new BiFunction() { + @Override + public Integer apply(Integer integer, Long aLong) { + return integer; + } + }) + .buffer(Flowable.interval(0,100, TimeUnit.MILLISECONDS), + new Function>() { + @Override + public Publisher apply(Long a) { + return Flowable.just(a).delay(200, TimeUnit.MILLISECONDS); + } + }) + .test() + .assertSubscribed() + .awaitDone(3, TimeUnit.SECONDS) + .assertComplete(); + } + + @Test + @SuppressWarnings("unchecked") + public void openClosemainError() { + Flowable.error(new TestException()) + .buffer(Flowable.never(), Functions.justFunction(Flowable.never())) + .test() + .assertFailure(TestException.class); + } + + @Test + @SuppressWarnings("unchecked") + public void openClosebadSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + BooleanSubscription bs1 = new BooleanSubscription(); + BooleanSubscription bs2 = new BooleanSubscription(); + + s.onSubscribe(bs1); + + assertFalse(bs1.isCancelled()); + assertFalse(bs2.isCancelled()); + + s.onSubscribe(bs2); + + assertFalse(bs1.isCancelled()); + assertTrue(bs2.isCancelled()); + + s.onError(new IOException()); + s.onComplete(); + s.onNext(1); + s.onError(new TestException()); + } + } + .buffer(Flowable.never(), Functions.justFunction(Flowable.never())) + .test() + .assertFailure(IOException.class); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertUndeliverable(errors, 1, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseOpenCompletes() { + PublishProcessor source = PublishProcessor.create(); + + PublishProcessor openIndicator = PublishProcessor.create(); + + PublishProcessor closeIndicator = PublishProcessor.create(); + + TestSubscriber> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) + .test(); + + openIndicator.onNext(1); + + assertTrue(closeIndicator.hasSubscribers()); + + openIndicator.onComplete(); + + assertTrue(source.hasSubscribers()); + assertTrue(closeIndicator.hasSubscribers()); + + closeIndicator.onComplete(); + + assertFalse(source.hasSubscribers()); + + ts.assertResult(Collections.emptyList()); + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseOpenCompletesNoBuffers() { + PublishProcessor source = PublishProcessor.create(); + + PublishProcessor openIndicator = PublishProcessor.create(); + + PublishProcessor closeIndicator = PublishProcessor.create(); + + TestSubscriber> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) + .test(); + + openIndicator.onNext(1); + + assertTrue(closeIndicator.hasSubscribers()); + + closeIndicator.onComplete(); + + assertTrue(source.hasSubscribers()); + assertTrue(openIndicator.hasSubscribers()); + + openIndicator.onComplete(); + + assertFalse(source.hasSubscribers()); + + ts.assertResult(Collections.emptyList()); + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseTake() { + PublishProcessor source = PublishProcessor.create(); + + PublishProcessor openIndicator = PublishProcessor.create(); + + PublishProcessor closeIndicator = PublishProcessor.create(); + + TestSubscriber> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) + .take(1) + .test(2); + + openIndicator.onNext(1); + closeIndicator.onComplete(); + + assertFalse(source.hasSubscribers()); + assertFalse(openIndicator.hasSubscribers()); + assertFalse(closeIndicator.hasSubscribers()); + + ts.assertResult(Collections.emptyList()); + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseLimit() { + PublishProcessor source = PublishProcessor.create(); + + PublishProcessor openIndicator = PublishProcessor.create(); + + PublishProcessor closeIndicator = PublishProcessor.create(); + + TestSubscriber> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) + .limit(1) + .test(2); + + openIndicator.onNext(1); + closeIndicator.onComplete(); + + assertFalse(source.hasSubscribers()); + assertFalse(openIndicator.hasSubscribers()); + assertFalse(closeIndicator.hasSubscribers()); + + ts.assertResult(Collections.emptyList()); + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseEmptyBackpressure() { + PublishProcessor source = PublishProcessor.create(); + + PublishProcessor openIndicator = PublishProcessor.create(); + + PublishProcessor closeIndicator = PublishProcessor.create(); + + TestSubscriber> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) + .test(0); + + source.onComplete(); + + assertFalse(openIndicator.hasSubscribers()); + assertFalse(closeIndicator.hasSubscribers()); + + ts.assertResult(); + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseErrorBackpressure() { + PublishProcessor source = PublishProcessor.create(); + + PublishProcessor openIndicator = PublishProcessor.create(); + + PublishProcessor closeIndicator = PublishProcessor.create(); + + TestSubscriber> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) + .test(0); + + source.onError(new TestException()); + + assertFalse(openIndicator.hasSubscribers()); + assertFalse(closeIndicator.hasSubscribers()); + + ts.assertFailure(TestException.class); + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseBadOpen() { + List errors = TestHelper.trackPluginErrors(); + try { + Flowable.never() + .buffer(new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + + assertFalse(((Disposable)s).isDisposed()); + + BooleanSubscription bs1 = new BooleanSubscription(); + BooleanSubscription bs2 = new BooleanSubscription(); + + s.onSubscribe(bs1); + + assertFalse(bs1.isCancelled()); + assertFalse(bs2.isCancelled()); + + s.onSubscribe(bs2); + + assertFalse(bs1.isCancelled()); + assertTrue(bs2.isCancelled()); + + s.onError(new IOException()); + + assertTrue(((Disposable)s).isDisposed()); + + s.onComplete(); + s.onNext(1); + s.onError(new TestException()); + } + }, Functions.justFunction(Flowable.never())) + .test() + .assertFailure(IOException.class); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertUndeliverable(errors, 1, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseBadClose() { + List errors = TestHelper.trackPluginErrors(); + try { + Flowable.never() + .buffer(Flowable.just(1).concatWith(Flowable.never()), + Functions.justFunction(new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + + assertFalse(((Disposable)s).isDisposed()); + + BooleanSubscription bs1 = new BooleanSubscription(); + BooleanSubscription bs2 = new BooleanSubscription(); + + s.onSubscribe(bs1); + + assertFalse(bs1.isCancelled()); + assertFalse(bs2.isCancelled()); + + s.onSubscribe(bs2); + + assertFalse(bs1.isCancelled()); + assertTrue(bs2.isCancelled()); + + s.onError(new IOException()); + + assertTrue(((Disposable)s).isDisposed()); + + s.onComplete(); + s.onNext(1); + s.onError(new TestException()); + } + })) + .test() + .assertFailure(IOException.class); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertUndeliverable(errors, 1, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java index 93a9e2fb74..6c633581ea 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java @@ -17,6 +17,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import java.io.IOException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -27,11 +28,12 @@ import io.reactivex.*; import io.reactivex.Observable; import io.reactivex.Observer; -import io.reactivex.disposables.Disposables; -import io.reactivex.exceptions.TestException; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.observers.*; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.*; import io.reactivex.subjects.PublishSubject; @@ -1500,4 +1502,298 @@ public void run() throws Exception { assertEquals(0, counter.get()); } + + @Test + @SuppressWarnings("unchecked") + public void boundaryOpenCloseDisposedOnComplete() { + PublishSubject source = PublishSubject.create(); + + PublishSubject openIndicator = PublishSubject.create(); + + PublishSubject closeIndicator = PublishSubject.create(); + + TestObserver> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) + .test(); + + assertTrue(source.hasObservers()); + assertTrue(openIndicator.hasObservers()); + assertFalse(closeIndicator.hasObservers()); + + openIndicator.onNext(1); + + assertTrue(openIndicator.hasObservers()); + assertTrue(closeIndicator.hasObservers()); + + source.onComplete(); + + ts.assertResult(Collections.emptyList()); + + assertFalse(openIndicator.hasObservers()); + assertFalse(closeIndicator.hasObservers()); + } + + @Test + public void bufferedCanCompleteIfOpenNeverCompletesDropping() { + Observable.range(1, 50) + .zipWith(Observable.interval(5, TimeUnit.MILLISECONDS), + new BiFunction() { + @Override + public Integer apply(Integer integer, Long aLong) { + return integer; + } + }) + .buffer(Observable.interval(0,200, TimeUnit.MILLISECONDS), + new Function>() { + @Override + public Observable apply(Long a) { + return Observable.just(a).delay(100, TimeUnit.MILLISECONDS); + } + }) + .test() + .assertSubscribed() + .awaitDone(3, TimeUnit.SECONDS) + .assertComplete(); + } + + @Test + public void bufferedCanCompleteIfOpenNeverCompletesOverlapping() { + Observable.range(1, 50) + .zipWith(Observable.interval(5, TimeUnit.MILLISECONDS), + new BiFunction() { + @Override + public Integer apply(Integer integer, Long aLong) { + return integer; + } + }) + .buffer(Observable.interval(0,100, TimeUnit.MILLISECONDS), + new Function>() { + @Override + public Observable apply(Long a) { + return Observable.just(a).delay(200, TimeUnit.MILLISECONDS); + } + }) + .test() + .assertSubscribed() + .awaitDone(3, TimeUnit.SECONDS) + .assertComplete(); + } + + @Test + @SuppressWarnings("unchecked") + public void openClosemainError() { + Observable.error(new TestException()) + .buffer(Observable.never(), Functions.justFunction(Observable.never())) + .test() + .assertFailure(TestException.class); + } + + @Test + @SuppressWarnings("unchecked") + public void openClosebadSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer s) { + Disposable bs1 = Disposables.empty(); + Disposable bs2 = Disposables.empty(); + + s.onSubscribe(bs1); + + assertFalse(bs1.isDisposed()); + assertFalse(bs2.isDisposed()); + + s.onSubscribe(bs2); + + assertFalse(bs1.isDisposed()); + assertTrue(bs2.isDisposed()); + + s.onError(new IOException()); + s.onComplete(); + s.onNext(1); + s.onError(new TestException()); + } + } + .buffer(Observable.never(), Functions.justFunction(Observable.never())) + .test() + .assertFailure(IOException.class); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertUndeliverable(errors, 1, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseOpenCompletes() { + PublishSubject source = PublishSubject.create(); + + PublishSubject openIndicator = PublishSubject.create(); + + PublishSubject closeIndicator = PublishSubject.create(); + + TestObserver> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) + .test(); + + openIndicator.onNext(1); + + assertTrue(closeIndicator.hasObservers()); + + openIndicator.onComplete(); + + assertTrue(source.hasObservers()); + assertTrue(closeIndicator.hasObservers()); + + closeIndicator.onComplete(); + + assertFalse(source.hasObservers()); + + ts.assertResult(Collections.emptyList()); + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseOpenCompletesNoBuffers() { + PublishSubject source = PublishSubject.create(); + + PublishSubject openIndicator = PublishSubject.create(); + + PublishSubject closeIndicator = PublishSubject.create(); + + TestObserver> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) + .test(); + + openIndicator.onNext(1); + + assertTrue(closeIndicator.hasObservers()); + + closeIndicator.onComplete(); + + assertTrue(source.hasObservers()); + assertTrue(openIndicator.hasObservers()); + + openIndicator.onComplete(); + + assertFalse(source.hasObservers()); + + ts.assertResult(Collections.emptyList()); + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseTake() { + PublishSubject source = PublishSubject.create(); + + PublishSubject openIndicator = PublishSubject.create(); + + PublishSubject closeIndicator = PublishSubject.create(); + + TestObserver> ts = source + .buffer(openIndicator, Functions.justFunction(closeIndicator)) + .take(1) + .test(); + + openIndicator.onNext(1); + closeIndicator.onComplete(); + + assertFalse(source.hasObservers()); + assertFalse(openIndicator.hasObservers()); + assertFalse(closeIndicator.hasObservers()); + + ts.assertResult(Collections.emptyList()); + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseBadOpen() { + List errors = TestHelper.trackPluginErrors(); + try { + Observable.never() + .buffer(new Observable() { + @Override + protected void subscribeActual(Observer s) { + + assertFalse(((Disposable)s).isDisposed()); + + Disposable bs1 = Disposables.empty(); + Disposable bs2 = Disposables.empty(); + + s.onSubscribe(bs1); + + assertFalse(bs1.isDisposed()); + assertFalse(bs2.isDisposed()); + + s.onSubscribe(bs2); + + assertFalse(bs1.isDisposed()); + assertTrue(bs2.isDisposed()); + + s.onError(new IOException()); + + assertTrue(((Disposable)s).isDisposed()); + + s.onComplete(); + s.onNext(1); + s.onError(new TestException()); + } + }, Functions.justFunction(Observable.never())) + .test() + .assertFailure(IOException.class); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertUndeliverable(errors, 1, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void openCloseBadClose() { + List errors = TestHelper.trackPluginErrors(); + try { + Observable.never() + .buffer(Observable.just(1).concatWith(Observable.never()), + Functions.justFunction(new Observable() { + @Override + protected void subscribeActual(Observer s) { + + assertFalse(((Disposable)s).isDisposed()); + + Disposable bs1 = Disposables.empty(); + Disposable bs2 = Disposables.empty(); + + s.onSubscribe(bs1); + + assertFalse(bs1.isDisposed()); + assertFalse(bs2.isDisposed()); + + s.onSubscribe(bs2); + + assertFalse(bs1.isDisposed()); + assertTrue(bs2.isDisposed()); + + s.onError(new IOException()); + + assertTrue(((Disposable)s).isDisposed()); + + s.onComplete(); + s.onNext(1); + s.onError(new TestException()); + } + })) + .test() + .assertFailure(IOException.class); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertUndeliverable(errors, 1, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } }