diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 57b0ea3544..5553074d74 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -8305,7 +8305,7 @@ public final Observable onTerminateDetach() { */ @SchedulerSupport(SchedulerSupport.NONE) public final ConnectableObservable publish() { - return publish(bufferSize()); + return ObservablePublish.create(this); } /** @@ -8329,58 +8329,8 @@ public final ConnectableObservable publish() { */ @SchedulerSupport(SchedulerSupport.NONE) public final Observable publish(Function, ? extends ObservableSource> selector) { - return publish(selector, bufferSize()); - } - - /** - * Returns an Observable that emits the results of invoking a specified selector on items emitted by a - * {@link ConnectableObservable} that shares a single subscription to the underlying sequence. - *

- * - *

- *
Scheduler:
- *
{@code publish} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param - * the type of items emitted by the resulting ObservableSource - * @param selector - * a function that can use the multicasted source sequence as many times as needed, without - * causing multiple subscriptions to the source sequence. Observers to the given source will - * receive all notifications of the source from the time of the subscription forward. - * @param bufferSize - * the number of elements to prefetch from the current Observable - * @return an Observable that emits the results of invoking the selector on the items emitted by a {@link ConnectableObservable} that shares a single subscription to the underlying sequence - * @see ReactiveX operators documentation: Publish - */ - @SchedulerSupport(SchedulerSupport.NONE) - public final Observable publish(Function, ? extends ObservableSource> selector, int bufferSize) { - ObjectHelper.verifyPositive(bufferSize, "bufferSize"); ObjectHelper.requireNonNull(selector, "selector is null"); - return ObservablePublish.create(this, selector, bufferSize); - } - - /** - * Returns a {@link ConnectableObservable}, which is a variety of ObservableSource that waits until its - * {@link ConnectableObservable#connect connect} method is called before it begins emitting items to those - * {@link Observer}s that have subscribed to it. - *

- * - *

- *
Scheduler:
- *
{@code publish} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param bufferSize - * the number of elements to prefetch from the current Observable - * @return a {@link ConnectableObservable} that upon connection causes the source ObservableSource to emit items - * to its {@link Observer}s - * @see ReactiveX operators documentation: Publish - */ - @SchedulerSupport(SchedulerSupport.NONE) - public final ConnectableObservable publish(int bufferSize) { - ObjectHelper.verifyPositive(bufferSize, "bufferSize"); - return ObservablePublish.create(this, bufferSize); + return new ObservablePublishSelector(this, selector); } /** diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBuffer.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBuffer.java index 58adba16c7..3014201c8e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBuffer.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBuffer.java @@ -106,17 +106,15 @@ public boolean isDisposed() { @Override public void onNext(T t) { U b = buffer; - if (b == null) { - return; - } - - b.add(t); + if (b != null) { + b.add(t); - if (++size >= count) { - actual.onNext(b); + if (++size >= count) { + actual.onNext(b); - size = 0; - createBuffer(); + size = 0; + createBuffer(); + } } } @@ -185,7 +183,7 @@ public void onNext(T t) { U b; try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources."); } catch (Throwable e) { buffers.clear(); s.dispose(); @@ -193,13 +191,6 @@ public void onNext(T t) { return; } - if (b == null) { - buffers.clear(); - s.dispose(); - actual.onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); - return; - } - buffers.offer(b); } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapEager.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapEager.java index 0990d1edde..def9e4cb5d 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapEager.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapEager.java @@ -183,12 +183,8 @@ void disposeAll() { for (;;) { - try { - inner = observers.poll(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - throw ExceptionHelper.wrapOrThrow(ex); - } + inner = observers.poll(); + if (inner == null) { return; } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java b/src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java index 27ebd2e47d..4a8ba67bda 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java @@ -18,11 +18,10 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; -import io.reactivex.functions.*; -import io.reactivex.internal.disposables.*; +import io.reactivex.functions.Consumer; +import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.fuseable.HasUpstreamObservableSource; -import io.reactivex.internal.queue.SpscLinkedArrayQueue; -import io.reactivex.internal.util.*; +import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.observables.ConnectableObservable; import io.reactivex.plugins.RxJavaPlugins; @@ -37,124 +36,85 @@ public final class ObservablePublish extends ConnectableObservable impleme /** Holds the current subscriber that is, will be or just was subscribed to the source observable. */ final AtomicReference> current; - /** The size of the prefetch buffer. */ - final int bufferSize; - final ObservableSource onSubscribe; /** * Creates a OperatorPublish instance to publish values of the given source observable. * @param the source value type * @param source the source observable - * @param bufferSize the size of the prefetch buffer * @return the connectable observable */ - public static ConnectableObservable create(ObservableSource source, final int bufferSize) { + public static ConnectableObservable create(ObservableSource source) { // the current connection to source needs to be shared between the operator and its onSubscribe call final AtomicReference> curr = new AtomicReference>(); ObservableSource onSubscribe = new ObservableSource() { @Override public void subscribe(Observer child) { - // concurrent connection/disconnection may change the state, - // we loop to be atomic while the child subscribes - for (;;) { - // get the current subscriber-to-source - PublishObserver r = curr.get(); - // if there isn't one or it is disposed - if (r == null || r.isDisposed()) { - // create a new subscriber to source - PublishObserver u = new PublishObserver(curr, bufferSize); - // let's try setting it as the current subscriber-to-source - if (!curr.compareAndSet(r, u)) { - // didn't work, maybe someone else did it or the current subscriber - // to source has just finished - continue; + // create the backpressure-managing producer for this child + InnerDisposable inner = new InnerDisposable(child); + child.onSubscribe(inner); + // concurrent connection/disconnection may change the state, + // we loop to be atomic while the child subscribes + for (;;) { + // get the current subscriber-to-source + PublishObserver r = curr.get(); + // if there isn't one or it is disposed + if (r == null || r.isDisposed()) { + // create a new subscriber to source + PublishObserver u = new PublishObserver(curr); + // let's try setting it as the current subscriber-to-source + if (!curr.compareAndSet(r, u)) { + // didn't work, maybe someone else did it or the current subscriber + // to source has just finished + continue; + } + // we won, let's use it going onwards + r = u; } - // we won, let's use it going onwards - r = u; - } - // create the backpressure-managing producer for this child - InnerDisposable inner = new InnerDisposable(r, child); - /* - * Try adding it to the current subscriber-to-source, add is atomic in respect - * to other adds and the termination of the subscriber-to-source. - */ - if (r.add(inner)) { - // the producer has been registered with the current subscriber-to-source so - // at least it will receive the next terminal event - // setting the producer will trigger the first request to be considered by - // the subscriber-to-source. - child.onSubscribe(inner); - break; // NOPMD + /* + * Try adding it to the current subscriber-to-source, add is atomic in respect + * to other adds and the termination of the subscriber-to-source. + */ + if (r.add(inner)) { + inner.setParent(r); + break; // NOPMD + } + /* + * The current PublishObserver has been terminated, try with a newer one. + */ + /* + * Note: although technically correct, concurrent disconnects can cause + * unexpected behavior such as child observers never receiving anything + * (unless connected again). An alternative approach, similar to + * PublishSubject would be to immediately terminate such child + * observers as well: + * + * Object term = r.terminalEvent; + * if (r.nl.isCompleted(term)) { + * child.onComplete(); + * } else { + * child.onError(r.nl.getError(term)); + * } + * return; + * + * The original concurrent behavior was non-deterministic in this regard as well. + * Allowing this behavior, however, may introduce another unexpected behavior: + * after disconnecting a previous connection, one might not be able to prepare + * a new connection right after a previous termination by subscribing new child + * observers asynchronously before a connect call. + */ } - /* - * The current PublishObserver has been terminated, try with a newer one. - */ - /* - * Note: although technically correct, concurrent disconnects can cause - * unexpected behavior such as child observers never receiving anything - * (unless connected again). An alternative approach, similar to - * PublishSubject would be to immediately terminate such child - * observers as well: - * - * Object term = r.terminalEvent; - * if (r.nl.isCompleted(term)) { - * child.onComplete(); - * } else { - * child.onError(r.nl.getError(term)); - * } - * return; - * - * The original concurrent behavior was non-deterministic in this regard as well. - * Allowing this behavior, however, may introduce another unexpected behavior: - * after disconnecting a previous connection, one might not be able to prepare - * a new connection right after a previous termination by subscribing new child - * observers asynchronously before a connect call. - */ - } } }; - return RxJavaPlugins.onAssembly(new ObservablePublish(onSubscribe, source, curr, bufferSize)); - } - - public static Observable create(final ObservableSource source, - final Function, ? extends ObservableSource> selector, final int bufferSize) { - return RxJavaPlugins.onAssembly(new Observable() { - @Override - protected void subscribeActual(Observer o) { - ConnectableObservable op = ObservablePublish.create(source, bufferSize); - - final ObserverResourceWrapper srw = new ObserverResourceWrapper(o); - - ObservableSource target; - - try { - target = selector.apply(op); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - EmptyDisposable.error(ex, srw); - return; - } - - target.subscribe(srw); - - op.connect(new Consumer() { - @Override - public void accept(Disposable r) { - srw.setResource(r); - } - }); - } - }); + return RxJavaPlugins.onAssembly(new ObservablePublish(onSubscribe, source, curr)); } private ObservablePublish(ObservableSource onSubscribe, ObservableSource source, - final AtomicReference> current, int bufferSize) { + final AtomicReference> current) { this.onSubscribe = onSubscribe; this.source = source; this.current = current; - this.bufferSize = bufferSize; } @Override @@ -178,7 +138,7 @@ public void connect(Consumer connection) { // if there is none yet or the current has been disposed if (ps == null || ps.isDisposed()) { // create a new subscriber-to-source - PublishObserver u = new PublishObserver(current, bufferSize); + PublishObserver u = new PublishObserver(current); // try setting it as the current subscriber-to-source if (!current.compareAndSet(ps, u)) { // did not work, perhaps a new subscriber arrived @@ -217,13 +177,10 @@ public void connect(Consumer connection) { } @SuppressWarnings("rawtypes") - static final class PublishObserver implements Observer, Disposable { - /** Holds notifications from upstream. */ - final SpscLinkedArrayQueue queue; + static final class PublishObserver + implements Observer, Disposable { /** Holds onto the current connected PublishObserver. */ final AtomicReference> current; - /** Contains either an onComplete or an onError token from upstream. */ - volatile Object terminalEvent; /** Indicates an empty array of inner observers. */ static final InnerDisposable[] EMPTY = new InnerDisposable[0]; @@ -231,28 +188,23 @@ static final class PublishObserver implements Observer, Disposable { static final InnerDisposable[] TERMINATED = new InnerDisposable[0]; /** Tracks the subscribed observers. */ - final AtomicReference observers; + final AtomicReference[]> observers; /** * Atomically changed from false to true by connect to make sure the * connection is only performed by one thread. */ final AtomicBoolean shouldConnect; - /** Guarded by this. */ - boolean emitting; - /** Guarded by this. */ - boolean missed; - final AtomicReference s = new AtomicReference(); - PublishObserver(AtomicReference> current, int bufferSize) { - this.queue = new SpscLinkedArrayQueue(bufferSize); - - this.observers = new AtomicReference(EMPTY); + @SuppressWarnings("unchecked") + PublishObserver(AtomicReference> current) { + this.observers = new AtomicReference[]>(EMPTY); this.current = current; this.shouldConnect = new AtomicBoolean(); } + @SuppressWarnings("unchecked") @Override public void dispose() { if (observers.get() != TERMINATED) { @@ -277,36 +229,29 @@ public void onSubscribe(Disposable s) { @Override public void onNext(T t) { - // we expect upstream to honor backpressure requests - // nl is required because JCTools queue doesn't accept nulls. - if (!queue.offer(t)) { - onError(new IllegalStateException("Prefetch queue is full?!")); - } else { - // since many things can happen concurrently, we have a common dispatch - // loop to act on the current state serially - dispatch(); + for (InnerDisposable inner : observers.get()) { + inner.child.onNext(t); } } + @SuppressWarnings("unchecked") @Override public void onError(Throwable e) { - // The observer front is accessed serially as required by spec so - // no need to CAS in the terminal value - if (terminalEvent == null) { - terminalEvent = NotificationLite.error(e); - // since many things can happen concurrently, we have a common dispatch - // loop to act on the current state serially - dispatch(); + current.compareAndSet(this, null); + InnerDisposable[] a = observers.getAndSet(TERMINATED); + if (a.length != 0) { + for (InnerDisposable inner : a) { + inner.child.onError(e); + } + } else { + RxJavaPlugins.onError(e); } } + @SuppressWarnings("unchecked") @Override public void onComplete() { - // The observer front is accessed serially as required by spec so - // no need to CAS in the terminal value - if (terminalEvent == null) { - terminalEvent = NotificationLite.complete(); - // since many things can happen concurrently, we have a common dispatch loop - // to act on the current state serially - dispatch(); + current.compareAndSet(this, null); + for (InnerDisposable inner : observers.getAndSet(TERMINATED)) { + inner.child.onComplete(); } } @@ -317,13 +262,10 @@ public void onComplete() { * @return true if succeeded, false otherwise */ boolean add(InnerDisposable producer) { - if (producer == null) { - throw new NullPointerException(); - } // the state can change so we do a CAS loop to achieve atomicity for (;;) { // get the current producer array - InnerDisposable[] c = observers.get(); + InnerDisposable[] c = observers.get(); // if this subscriber-to-source reached a terminal state by receiving // an onError or onComplete, just refuse to add the new producer if (c == TERMINATED) { @@ -331,7 +273,8 @@ boolean add(InnerDisposable producer) { } // we perform a copy-on-write logic int len = c.length; - InnerDisposable[] u = new InnerDisposable[len + 1]; + @SuppressWarnings("unchecked") + InnerDisposable[] u = new InnerDisposable[len + 1]; System.arraycopy(c, 0, u, 0, len); u[len] = producer; // try setting the observers array @@ -347,19 +290,20 @@ boolean add(InnerDisposable producer) { * Atomically removes the given producer from the observers array. * @param producer the producer to remove */ + @SuppressWarnings("unchecked") void remove(InnerDisposable producer) { // the state can change so we do a CAS loop to achieve atomicity for (;;) { // let's read the current observers array - InnerDisposable[] c = observers.get(); + InnerDisposable[] c = observers.get(); // if it is either empty or terminated, there is nothing to remove so we quit - if (c == EMPTY || c == TERMINATED) { + int len = c.length; + if (len == 0) { return; } // let's find the supplied producer in the array // although this is O(n), we don't expect too many child observers in general int j = -1; - int len = c.length; for (int i = 0; i < len; i++) { if (c[i].equals(producer)) { j = i; @@ -371,7 +315,7 @@ void remove(InnerDisposable producer) { return; } // we do copy-on-write logic here - InnerDisposable[] u; + InnerDisposable[] u; // we don't create a new empty array if producer was the single inhabitant // but rather reuse an empty array if (len == 1) { @@ -392,264 +336,41 @@ void remove(InnerDisposable producer) { // (a concurrent add/remove or termination), we need to retry } } - - /** - * Perform termination actions in case the source has terminated in some way and - * the queue has also become empty. - * @param term the terminal event (a NotificationLite.error or completed) - * @param empty set to true if the queue is empty - * @return true if there is indeed a terminal condition - */ - boolean checkTerminated(Object term, boolean empty) { - // first of all, check if there is actually a terminal event - if (term != null) { - // is it a completion event (impl. note, this is much cheaper than checking for isError) - if (NotificationLite.isComplete(term)) { - // but we also need to have an empty queue - if (empty) { - // this will prevent OnSubscribe spinning on a terminated but - // not yet disposed PublishObserver - current.compareAndSet(this, null); - try { - /* - * This will swap in a terminated array so add() in OnSubscribe will reject - * child observers to associate themselves with a terminated and thus - * never again emitting chain. - * - * Since we atomically change the contents of 'observers' only one - * operation wins at a time. If an add() wins before this getAndSet, - * its value will be part of the returned array by getAndSet and thus - * will receive the terminal notification. Otherwise, if getAndSet wins, - * add() will refuse to add the child producer and will trigger the - * creation of subscriber-to-source. - */ - for (InnerDisposable ip : observers.getAndSet(TERMINATED)) { - ip.child.onComplete(); - } - } finally { - // we explicitly dispose/disconnect from the upstream - // after we sent out the terminal event to child observers - dispose(); - } - // indicate we reached the terminal state - return true; - } - } else { - Throwable t = NotificationLite.getError(term); - // this will prevent OnSubscribe spinning on a terminated - // but not yet disposed PublishObserver - current.compareAndSet(this, null); - try { - // this will swap in a terminated array so add() in OnSubscribe will reject - // child observers to associate themselves with a terminated and thus - // never again emitting chain - for (InnerDisposable ip : observers.getAndSet(TERMINATED)) { - ip.child.onError(t); - } - } finally { - // we explicitly dispose/disconnect from the upstream - // after we sent out the terminal event to child observers - dispose(); - } - // indicate we reached the terminal state - return true; - } - } - // there is still work to be done - return false; - } - - /** - * The common serialization point of events arriving from upstream and child observers - * requesting more. - */ - void dispatch() { - // standard construct of emitter loop (blocking) - // if there is an emission going on, indicate that more work needs to be done - // the exact nature of this work needs to be determined from other data structures - synchronized (this) { - if (emitting) { - missed = true; - return; - } - // there was no emission going on, we won and will start emitting - emitting = true; - missed = false; - } - /* - * In case an exception is thrown in the loop, we need to set emitting back to false - * on the way out (the exception will propagate up) so if it bounces back and - * onError is called, its dispatch() call will have the opportunity to emit it. - * However, if we want to exit regularly, we will set the emitting to false (+ other operations) - * atomically so we want to prevent the finally part to accidentally unlock some other - * emissions happening between the two synchronized blocks. - */ - boolean skipFinal = false; - try { - for (;;) { - /* - * We need to read terminalEvent before checking the queue for emptiness because - * all enqueue happens before setting the terminal event. - * If it were the other way around, when the emission is paused between - * checking isEmpty and checking terminalEvent, some other thread might - * have produced elements and set the terminalEvent and we'd quit emitting - * prematurely. - */ - Object term = terminalEvent; - /* - * See if the queue is empty; since we need this information multiple - * times later on, we read it one. - * Although the queue can become non-empty in the mean time, we will - * detect it through the missing flag and will do another iteration. - */ - boolean empty = queue.isEmpty(); - // if the queue is empty and the terminal event was received, quit - // and don't bother restoring emitting to false: no further activity is - // possible at this point - if (checkTerminated(term, empty)) { - skipFinal = true; - return; - } - - // We have elements queued. Note that due to the serialization nature of dispatch() - // this loop is the only one which can turn a non-empty queue into an empty one - // and as such, no need to ask the queue itself again for that. - if (!empty) { - // We take a snapshot of the current child observers. - // Concurrent observers may miss this iteration, but it is to be expected - @SuppressWarnings("unchecked") - InnerDisposable[] ps = observers.get(); - - int len = ps.length; - // count how many have triggered dispose() - int disposed = 0; - - // Now find the minimum amount each child-subscriber requested - // since we can only emit that much to all of them without violating - // backpressure constraints - for (InnerDisposable ip : ps) { - if (ip.cancelled) { - disposed++; - } - // we ignore those with NOT_REQUESTED as if they aren't even there - } - - // it may happen everyone has disposed between here and observers.get() - // or we have no observers at all to begin with - if (len == disposed) { - term = terminalEvent; - // so let's consume a value from the queue - Object v = queue.poll(); - // or terminate if there was a terminal event and the queue is empty - if (checkTerminated(term, v == null)) { - skipFinal = true; - return; - } - // and retry emitting to potential new child observers - continue; - } - // if we get here, it means there are non-disposed child observers - // and we count the number of emitted values because the queue - // may contain less than requested - for (;;) { - term = terminalEvent; - Object v = queue.poll(); - empty = v == null; - // let's check if there is a terminal event and the queue became empty just now - if (checkTerminated(term, empty)) { - skipFinal = true; - return; - } - // the queue is empty but we aren't terminated yet, finish this emission loop - if (empty) { - break; - } - // we need to unwrap potential nulls - T value = NotificationLite.getValue(v); - // let's emit this value to all child observers - for (InnerDisposable ip : ps) { - // if ip.get() is negative, the child has either disposed in the - // meantime or hasn't requested anything yet - // this eager behavior will skip disposed children in case - // multiple values are available in the queue - if (!ip.cancelled) { - ip.child.onNext(value); - } - } - } - - // if we have requests but not an empty queue after emission - // let's try again to see if more requests/child observers are - // ready to receive more - if (!empty) { - continue; - } - } - - // we did what we could: either the queue is empty or child observers - // haven't requested more (or both), let's try to finish dispatching - synchronized (this) { - // since missed is changed atomically, if we see it as true - // it means some state has changed and we need to loop again - // and handle that case - if (!missed) { - // but if no missed dispatch happened, let's stop emitting - emitting = false; - // and skip the emitting = false in the finally block as well - skipFinal = true; - return; - } - // we acknowledge the missed changes so far - missed = false; - } - } - } finally { - // unless returned cleanly (i.e., some method above threw) - if (!skipFinal) { - // we stop emitting so the error can propagate back down through onError - synchronized (this) { - emitting = false; - } - } - } - } } /** * A Disposable that manages the request and disposed state of a * child Observer in thread-safe manner. + * {@code this} holds the parent PublishObserver or itself if disposed * @param the value type */ - static final class InnerDisposable implements Disposable { - /** - * The parent subscriber-to-source used to allow removing the child in case of - * child dispose() call. - */ - final PublishObserver parent; + static final class InnerDisposable + extends AtomicReference + implements Disposable { + private static final long serialVersionUID = -1100270633763673112L; /** The actual child subscriber. */ final Observer child; - /** - * Indicates this child has been disposed: the state is swapped in atomically and - * will prevent the dispatch() to emit (too many) values to a terminated child subscriber. - */ - volatile boolean cancelled; - InnerDisposable(PublishObserver parent, Observer child) { - this.parent = parent; + InnerDisposable(Observer child) { this.child = child; } @Override public boolean isDisposed() { - return cancelled; + return get() == this; } + @SuppressWarnings("unchecked") @Override public void dispose() { - if (!cancelled) { - cancelled = true; - // remove this from the parent - parent.remove(this); - parent.dispatch(); + Object o = getAndSet(this); + if (o != null && o != this) { + ((PublishObserver)o).remove(this); + } + } + + void setParent(PublishObserver p) { + if (!compareAndSet(null, p)) { + p.remove(this); } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservablePublishSelector.java b/src/main/java/io/reactivex/internal/operators/observable/ObservablePublishSelector.java new file mode 100644 index 0000000000..1a9d1ec482 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservablePublishSelector.java @@ -0,0 +1,143 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.*; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.subjects.PublishSubject; + +/** + * Shares a source Observable for the duration of a selector function. + * @param the input value type + * @param the output value type + */ +public final class ObservablePublishSelector extends AbstractObservableWithUpstream { + + final Function, ? extends ObservableSource> selector; + + public ObservablePublishSelector(final ObservableSource source, + final Function, ? extends ObservableSource> selector) { + super(source); + this.selector = selector; + } + + @Override + protected void subscribeActual(Observer observer) { + PublishSubject subject = PublishSubject.create(); + + ObservableSource target; + + try { + target = ObjectHelper.requireNonNull(selector.apply(subject), "The selector returned a null ObservableSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + EmptyDisposable.error(ex, observer); + return; + } + + TargetObserver o = new TargetObserver(observer); + + target.subscribe(o); + + source.subscribe(new SourceObserver(subject, o)); + } + + static final class SourceObserver implements Observer { + + final PublishSubject subject; + + final AtomicReference target; + + SourceObserver(PublishSubject subject, AtomicReference target) { + this.subject = subject; + this.target = target; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(target, d); + } + + @Override + public void onNext(T value) { + subject.onNext(value); + } + + @Override + public void onError(Throwable e) { + subject.onError(e); + } + + @Override + public void onComplete() { + subject.onComplete(); + } + } + + static final class TargetObserver + extends AtomicReference implements Observer, Disposable { + private static final long serialVersionUID = 854110278590336484L; + + final Observer actual; + + Disposable d; + + TargetObserver(Observer actual) { + this.actual = actual; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(R value) { + actual.onNext(value); + } + + @Override + public void onError(Throwable e) { + DisposableHelper.dispose(this); + actual.onError(e); + } + + @Override + public void onComplete() { + DisposableHelper.dispose(this); + actual.onComplete(); + } + + @Override + public void dispose() { + d.dispose(); + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return d.isDisposed(); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeLastTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeLastTimed.java index 67a40d5935..3bca4284f5 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeLastTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeLastTimed.java @@ -14,7 +14,7 @@ package io.reactivex.internal.operators.observable; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; import io.reactivex.*; import io.reactivex.disposables.Disposable; @@ -45,7 +45,8 @@ public void subscribeActual(Observer t) { source.subscribe(new TakeLastTimedObserver(t, count, time, unit, scheduler, bufferSize, delayError)); } - static final class TakeLastTimedObserver extends AtomicInteger implements Observer, Disposable { + static final class TakeLastTimedObserver + extends AtomicBoolean implements Observer, Disposable { private static final long serialVersionUID = -5677354903406201275L; final Observer actual; @@ -56,7 +57,7 @@ static final class TakeLastTimedObserver extends AtomicInteger implements Obs final SpscLinkedArrayQueue queue; final boolean delayError; - Disposable s; + Disposable d; volatile boolean cancelled; @@ -74,9 +75,9 @@ static final class TakeLastTimedObserver extends AtomicInteger implements Obs } @Override - public void onSubscribe(Disposable s) { - if (DisposableHelper.validate(this.s, s)) { - this.s = s; + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; actual.onSubscribe(this); } } @@ -118,12 +119,12 @@ public void onComplete() { @Override public void dispose() { - if (cancelled) { + if (!cancelled) { cancelled = true; + d.dispose(); - if (getAndIncrement() == 0) { + if (compareAndSet(false, true)) { queue.clear(); - s.dispose(); } } } @@ -134,89 +135,51 @@ public boolean isDisposed() { } void drain() { - if (getAndIncrement() != 0) { + if (!compareAndSet(false, true)) { return; } - int missed = 1; - final Observer a = actual; final SpscLinkedArrayQueue q = queue; final boolean delayError = this.delayError; for (;;) { + if (cancelled) { + q.clear(); + return; + } - if (done) { - boolean empty = q.isEmpty(); - - if (checkTerminated(empty, a, delayError)) { + if (!delayError) { + Throwable ex = error; + if (ex != null) { + q.clear(); + a.onError(ex); return; } - - for (;;) { - Object ts = q.poll(); // the timestamp long - empty = ts == null; - - if (checkTerminated(empty, a, delayError)) { - return; - } - - if (empty) { - break; - } - - @SuppressWarnings("unchecked") - T o = (T)q.poll(); - if (o == null) { - s.dispose(); - a.onError(new IllegalStateException("Queue empty?!")); - return; - } - - if ((Long)ts < scheduler.now(unit) - time) { - continue; - } - - a.onNext(o); - } } - missed = addAndGet(-missed); - if (missed == 0) { - break; - } - } - } + Object ts = q.poll(); // the timestamp long + boolean empty = ts == null; - boolean checkTerminated(boolean empty, Observer a, boolean delayError) { - if (cancelled) { - queue.clear(); - s.dispose(); - return true; - } - if (delayError) { if (empty) { - Throwable e = error; - if (e != null) { - a.onError(e); + Throwable ex = error; + if (ex != null) { + a.onError(ex); } else { a.onComplete(); } - return true; + return; } - } else { - Throwable e = error; - if (e != null) { - queue.clear(); - a.onError(e); - return true; - } else - if (empty) { - a.onComplete(); - return true; + + @SuppressWarnings("unchecked") + T o = (T)q.poll(); + + if ((Long)ts < scheduler.now(unit) - time) { + continue; } + + a.onNext(o); } - return false; } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableThrottleFirstTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableThrottleFirstTimed.java index 3a9fd5275d..99fc62098a 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableThrottleFirstTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableThrottleFirstTimed.java @@ -44,7 +44,10 @@ public void subscribeActual(Observer t) { } static final class DebounceTimedObserver + extends AtomicReference implements Observer, Disposable, Runnable { + private static final long serialVersionUID = 786994795061867455L; + final Observer actual; final long timeout; final TimeUnit unit; @@ -52,18 +55,6 @@ static final class DebounceTimedObserver Disposable s; - final AtomicReference timer = new AtomicReference(); - - static final Disposable NEW_TIMER = new Disposable() { - @Override - public void dispose() { } - - @Override - public boolean isDisposed() { - return true; - } - }; - volatile boolean gate; boolean done; @@ -85,27 +76,16 @@ public void onSubscribe(Disposable s) { @Override public void onNext(T t) { - if (done) { - return; - } - - if (!gate) { + if (!gate && !done) { gate = true; actual.onNext(t); - // FIXME should this be a periodic blocking or a value-relative blocking? - Disposable d = timer.get(); + Disposable d = get(); if (d != null) { d.dispose(); } - - if (timer.compareAndSet(d, NEW_TIMER)) { - d = worker.schedule(this, timeout, unit); - if (!timer.compareAndSet(NEW_TIMER, d)) { - d.dispose(); - } - } + DisposableHelper.replace(this, worker.schedule(this, timeout, unit)); } @@ -120,34 +100,33 @@ public void run() { public void onError(Throwable t) { if (done) { RxJavaPlugins.onError(t); - return; + } else { + done = true; + DisposableHelper.dispose(this); + actual.onError(t); } - done = true; - DisposableHelper.dispose(timer); - actual.onError(t); } @Override public void onComplete() { - if (done) { - return; + if (!done) { + done = true; + DisposableHelper.dispose(this); + worker.dispose(); + actual.onComplete(); } - done = true; - DisposableHelper.dispose(timer); - worker.dispose(); - actual.onComplete(); } @Override public void dispose() { - DisposableHelper.dispose(timer); + DisposableHelper.dispose(this); worker.dispose(); s.dispose(); } @Override public boolean isDisposed() { - return timer.get() == DisposableHelper.DISPOSED; + return DisposableHelper.isDisposed(get()); } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableToList.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableToList.java index 2b3931dedd..e76380345d 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableToList.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableToList.java @@ -13,29 +13,24 @@ package io.reactivex.internal.operators.observable; -import java.util.*; +import java.util.Collection; import java.util.concurrent.Callable; -import io.reactivex.ObservableSource; -import io.reactivex.Observer; +import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.internal.disposables.*; +import io.reactivex.internal.functions.*; public final class ObservableToList> extends AbstractObservableWithUpstream { final Callable collectionSupplier; + @SuppressWarnings({ "unchecked", "rawtypes" }) public ObservableToList(ObservableSource source, final int defaultCapacityHint) { super(source); - this.collectionSupplier = new Callable() { - @Override - @SuppressWarnings("unchecked") - public U call() throws Exception { - return (U)new ArrayList(defaultCapacityHint); - } - }; + this.collectionSupplier = (Callable)Functions.createArrayList(defaultCapacityHint); } public ObservableToList(ObservableSource source, Callable collectionSupplier) { @@ -47,7 +42,7 @@ public ObservableToList(ObservableSource source, Callable collectionSuppli public void subscribeActual(Observer t) { U coll; try { - coll = collectionSupplier.call(); + coll = ObjectHelper.requireNonNull(collectionSupplier.call(), "The collectionSupplier returned a null Collection"); } catch (Throwable e) { Exceptions.throwIfFatal(e); EmptyDisposable.error(e, t); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java index 9d1cc782fc..1d749909cf 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java @@ -316,36 +316,22 @@ static final class WindowOperation { static final class OperatorWindowBoundaryOpenObserver extends DisposableObserver { final WindowBoundaryMainObserver parent; - boolean done; - OperatorWindowBoundaryOpenObserver(WindowBoundaryMainObserver parent) { this.parent = parent; } @Override public void onNext(B t) { - if (done) { - return; - } parent.open(t); } @Override public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; - } - done = true; parent.error(t); } @Override public void onComplete() { - if (done) { - return; - } - done = true; parent.onComplete(); } } diff --git a/src/test/java/io/reactivex/TestHelper.java b/src/test/java/io/reactivex/TestHelper.java index a60e06d802..7adc63c0e6 100644 --- a/src/test/java/io/reactivex/TestHelper.java +++ b/src/test/java/io/reactivex/TestHelper.java @@ -38,6 +38,7 @@ import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.Subject; import io.reactivex.subscribers.TestSubscriber; @@ -280,7 +281,16 @@ public void onComplete() { RxJavaPlugins.setErrorHandler(null); } } - + /** + * Synchronizes the execution of two runnables (as much as possible) + * to test race conditions. + *

The method blocks until both have run to completion. + * @param r1 the first runnable + * @param r2 the second runnable + */ + public static void race(final Runnable r1, final Runnable r2) { + race(r1, r2, Schedulers.single()); + } /** * Synchronizes the execution of two runnables (as much as possible) * to test race conditions. @@ -1998,4 +2008,22 @@ public void onComplete() { throw new RuntimeException(ex); } } + + /** + * Returns an expanded error list of the given test consumer. + * @param to the test consumer instance + * @return the list + */ + public static List errorList(TestObserver to) { + return compositeList(to.errors().get(0)); + } + + /** + * Returns an expanded error list of the given test consumer. + * @param to the test consumer instance + * @return the list + */ + public static List errorList(TestSubscriber to) { + return compositeList(to.errors().get(0)); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableCombineLatestTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableCombineLatestTest.java index c4fc558895..e2532d82c8 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableCombineLatestTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableCombineLatestTest.java @@ -794,7 +794,7 @@ public Long apply(Long t1, Integer t2) { } }).subscribe(ts); - if (!latch.await(SIZE + 1000, TimeUnit.MILLISECONDS)) { + if (!latch.await(SIZE + 2000, TimeUnit.MILLISECONDS)) { fail("timed out"); } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableAllTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableAllTest.java index f8ee099fb1..9a1c150454 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableAllTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableAllTest.java @@ -14,16 +14,21 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.Test; import io.reactivex.*; -import io.reactivex.disposables.Disposable; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; public class ObservableAllTest { @@ -293,4 +298,39 @@ public boolean test(String v) { // assertTrue(ex.getCause().getMessage().contains("Boo!")); } + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.just(1).all(Functions.alwaysTrue()).toObservable()); + } + + @Test + public void predicateThrows() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + + observer.onNext(1); + observer.onNext(2); + observer.onError(new TestException()); + observer.onComplete(); + } + } + .all(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + throw new TestException(); + } + }) + .toObservable() + .test() + .assertFailure(TestException.class); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java index 5fe454da55..c1dc919a2a 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java @@ -1215,6 +1215,12 @@ public void dispose() { TestHelper.checkDisposed(Observable.range(1, 5) .buffer(1, TimeUnit.DAYS, Schedulers.single(), 2, Functions.createArrayList(16), true)); + + TestHelper.checkDisposed(Observable.range(1, 5).buffer(1)); + + TestHelper.checkDisposed(Observable.range(1, 5).buffer(2, 1)); + + TestHelper.checkDisposed(Observable.range(1, 5).buffer(1, 2)); } @SuppressWarnings("unchecked") @@ -1225,4 +1231,165 @@ public void restartTimer() { .test() .assertResult(Arrays.asList(1, 2), Arrays.asList(3, 4), Arrays.asList(5)); } + + @SuppressWarnings("unchecked") + @Test + public void bufferSupplierCrash2() { + Observable.range(1, 2) + .buffer(1, new Callable>() { + int calls; + @Override + public List call() throws Exception { + if (++calls == 2) { + throw new TestException(); + } + return new ArrayList(); + } + }) + .test() + .assertFailure(TestException.class, Arrays.asList(1)); + } + + @SuppressWarnings("unchecked") + @Test + public void bufferSkipSupplierCrash2() { + Observable.range(1, 2) + .buffer(2, 1, new Callable>() { + int calls; + @Override + public List call() throws Exception { + if (++calls == 2) { + throw new TestException(); + } + return new ArrayList(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void bufferSkipError() { + Observable.error(new TestException()) + .buffer(2, 1) + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void bufferSkipOverlap() { + Observable.range(1, 5) + .buffer(5, 1) + .test() + .assertResult( + Arrays.asList(1, 2, 3, 4, 5), + Arrays.asList(2, 3, 4, 5), + Arrays.asList(3, 4, 5), + Arrays.asList(4, 5), + Arrays.asList(5) + ); + } + + @SuppressWarnings("unchecked") + @Test + public void bufferTimedExactError() { + Observable.error(new TestException()) + .buffer(1, TimeUnit.DAYS) + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void bufferTimedSkipError() { + Observable.error(new TestException()) + .buffer(1, 2, TimeUnit.DAYS) + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void bufferTimedOverlapError() { + Observable.error(new TestException()) + .buffer(2, 1, TimeUnit.DAYS) + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void bufferTimedExactEmpty() { + Observable.empty() + .buffer(1, TimeUnit.DAYS) + .test() + .assertResult(Collections.emptyList()); + } + + @SuppressWarnings("unchecked") + @Test + public void bufferTimedSkipEmpty() { + Observable.empty() + .buffer(1, 2, TimeUnit.DAYS) + .test() + .assertResult(Collections.emptyList()); + } + + @SuppressWarnings("unchecked") + @Test + public void bufferTimedOverlapEmpty() { + Observable.empty() + .buffer(2, 1, TimeUnit.DAYS) + .test() + .assertResult(Collections.emptyList()); + } + + @SuppressWarnings("unchecked") + @Test + public void bufferTimedExactSupplierCrash() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject ps = PublishSubject.create(); + + TestObserver> to = ps + .buffer(1, TimeUnit.MILLISECONDS, scheduler, 1, new Callable>() { + int calls; + @Override + public List call() throws Exception { + if (++calls == 2) { + throw new TestException(); + } + return new ArrayList(); + } + }, true) + .test(); + + ps.onNext(1); + + scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); + + ps.onNext(2); + + to + .assertFailure(TestException.class, Arrays.asList(1)); + } + + @SuppressWarnings("unchecked") + @Test + public void bufferTimedExactBoundedError() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject ps = PublishSubject.create(); + + TestObserver> to = ps + .buffer(1, TimeUnit.MILLISECONDS, scheduler, 1, Functions.createArrayList(16), true) + .test(); + + ps.onError(new TestException()); + + to + .assertFailure(TestException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java index abc916fbba..9b3df7ab4b 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java @@ -16,19 +16,21 @@ import static org.junit.Assert.*; import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.*; import org.junit.*; import io.reactivex.*; -import io.reactivex.exceptions.TestException; +import io.reactivex.Observable; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; -import io.reactivex.subjects.PublishSubject; +import io.reactivex.subjects.*; public class ObservableConcatMapEagerTest { @@ -739,4 +741,213 @@ public void concatEagerIterable() { .test() .assertResult(1, 2); } + + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.just(1).hide().concatMapEager(new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + return Observable.range(1, 2); + } + })); + } + + @Test + public void empty() { + Observable.empty().hide().concatMapEager(new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + return Observable.range(1, 2); + } + }) + .test() + .assertResult(); + } + + @Test + public void innerError() { + Observable.just(1).hide().concatMapEager(new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + return Observable.error(new TestException()); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void innerErrorMaxConcurrency() { + Observable.just(1).hide().concatMapEager(new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + return Observable.error(new TestException()); + } + }, 1, 128) + .test() + .assertFailure(TestException.class); + } + + @Test + public void innerCallableThrows() { + Observable.just(1).hide().concatMapEager(new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + return Observable.fromCallable(new Callable() { + @Override + public Integer call() throws Exception { + throw new TestException(); + } + }); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void innerOuterRace() { + for (int i = 0; i < 500; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishSubject ps1 = PublishSubject.create(); + final PublishSubject ps2 = PublishSubject.create(); + + TestObserver to = ps1.concatMapEager(new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + return ps2; + } + }).test(); + + final TestException ex1 = new TestException(); + final TestException ex2 = new TestException(); + + ps1.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps1.onError(ex1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ps2.onError(ex2); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + to.assertSubscribed().assertNoValues().assertNotComplete(); + + Throwable ex = to.errors().get(0); + + if (ex instanceof CompositeException) { + List es = TestHelper.errorList(to); + TestHelper.assertError(es, 0, TestException.class); + TestHelper.assertError(es, 1, TestException.class); + } else { + to.assertError(TestException.class); + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, TestException.class); + } + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void nextCancelRace() { + for (int i = 0; i < 500; i++) { + final PublishSubject ps1 = PublishSubject.create(); + + final TestObserver to = ps1.concatMapEager(new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + return Observable.never(); + } + }).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps1.onNext(1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + to.assertEmpty(); + } + } + + @Test + public void mapperCancels() { + final TestObserver to = new TestObserver(); + + Observable.just(1).hide() + .concatMapEager(new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + to.cancel(); + return Observable.never(); + } + }, 1, 128) + .subscribe(to); + + to.assertEmpty(); + } + + @Test + public void innerErrorFused() { + Observable.just(1).hide().concatMapEager(new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + return Observable.range(1, 2).map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + throw new TestException(); + } + }); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void innerErrorAfterPoll() { + final UnicastSubject us = UnicastSubject.create(); + us.onNext(1); + + TestObserver to = new TestObserver() { + @Override + public void onNext(Integer t) { + super.onNext(t); + us.onError(new TestException()); + } + }; + + Observable.just(1).hide() + .concatMapEager(new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + return us; + } + }, 1, 128) + .subscribe(to); + + to + .assertFailure(TestException.class, 1); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybeTest.java index 619a95119c..e9c6d417fb 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybeTest.java @@ -199,4 +199,92 @@ public Integer call() throws NumberFormatException { .test() .assertFailure(NumberFormatException.class, 1); } + + @Test + public void asyncFlatten() { + Observable.range(1, 1000) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(1).subscribeOn(Schedulers.computation()); + } + }) + .take(500) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(500) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void asyncFlattenNone() { + Observable.range(1, 1000) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.empty().subscribeOn(Schedulers.computation()); + } + }) + .take(500) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(); + } + + @Test + public void successError() { + final PublishSubject ps = PublishSubject.create(); + + TestObserver to = Observable.range(1, 2) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + if (v == 2) { + return ps.singleElement(); + } + return Maybe.error(new TestException()); + } + }, true) + .test(); + + ps.onNext(1); + ps.onComplete(); + + to + .assertFailure(TestException.class, 1); + } + + @Test + public void completeError() { + final PublishSubject ps = PublishSubject.create(); + + TestObserver to = Observable.range(1, 2) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + if (v == 2) { + return ps.singleElement(); + } + return Maybe.error(new TestException()); + } + }, true) + .test(); + + ps.onComplete(); + + to + .assertFailure(TestException.class); + } + + @Test + public void disposed() { + TestHelper.checkDisposed(PublishSubject.create().flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.empty(); + } + })); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingleTest.java index 09847487c2..1dc7c6fbbf 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingleTest.java @@ -186,4 +186,54 @@ public Integer call() throws NumberFormatException { .test() .assertFailure(NumberFormatException.class, 1); } + + @Test + public void asyncFlatten() { + Observable.range(1, 1000) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(1).subscribeOn(Schedulers.computation()); + } + }) + .take(500) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(500) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void successError() { + final PublishSubject ps = PublishSubject.create(); + + TestObserver to = Observable.range(1, 2) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + if (v == 2) { + return ps.singleOrError(); + } + return Single.error(new TestException()); + } + }, true) + .test(); + + ps.onNext(1); + ps.onComplete(); + + to + .assertFailure(TestException.class, 1); + } + @Test + public void disposed() { + TestHelper.checkDisposed(PublishSubject.create().flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(1); + } + })); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservablePublishTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservablePublishTest.java index 585db200e6..808f320d98 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservablePublishTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservablePublishTest.java @@ -25,10 +25,15 @@ import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.*; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.fuseable.HasUpstreamObservableSource; import io.reactivex.observables.ConnectableObservable; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.*; +import io.reactivex.subjects.PublishSubject; public class ObservablePublishTest { @@ -392,4 +397,306 @@ public void testObserveOn() { } } } + + @Test + public void preNextConnect() { + for (int i = 0; i < 500; i++) { + + final ConnectableObservable co = Observable.empty().publish(); + + co.connect(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + co.test(); + } + }; + + TestHelper.race(r1, r1); + } + } + + @Test + public void connectRace() { + for (int i = 0; i < 500; i++) { + + final ConnectableObservable co = Observable.empty().publish(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + co.connect(); + } + }; + + TestHelper.race(r1, r1); + } + } + + @Test + public void selectorCrash() { + Observable.just(1).publish(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable v) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void source() { + Observable o = Observable.never(); + + assertSame(o, (((HasUpstreamObservableSource)o.publish()).source())); + } + + @Test + public void connectThrows() { + ConnectableObservable co = Observable.empty().publish(); + try { + co.connect(new Consumer() { + @Override + public void accept(Disposable s) throws Exception { + throw new TestException(); + } + }); + } catch (TestException ex) { + // expected + } + } + + @Test + public void addRemoveRace() { + for (int i = 0; i < 500; i++) { + + final ConnectableObservable co = Observable.empty().publish(); + + final TestObserver to = co.test(); + + final TestObserver to2 = new TestObserver(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + co.subscribe(to2); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void disposeOnArrival() { + ConnectableObservable co = Observable.empty().publish(); + + co.test(true).assertEmpty(); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.never().publish()); + + TestHelper.checkDisposed(Observable.never().publish(Functions.>identity())); + } + + @Test + public void empty() { + ConnectableObservable co = Observable.empty().publish(); + + co.connect(); + } + + @Test + public void take() { + ConnectableObservable co = Observable.range(1, 2).publish(); + + TestObserver to = co.take(1).test(); + + co.connect(); + + to.assertResult(1); + } + + @Test + public void just() { + final PublishSubject ps = PublishSubject.create(); + + ConnectableObservable co = ps.publish(); + + TestObserver to = new TestObserver() { + @Override + public void onNext(Integer t) { + super.onNext(t); + ps.onComplete(); + } + }; + + co.subscribe(to); + co.connect(); + + ps.onNext(1); + + to.assertResult(1); + } + + @Test + public void nextCancelRace() { + for (int i = 0; i < 500; i++) { + + final PublishSubject ps = PublishSubject.create(); + + final ConnectableObservable co = ps.publish(); + + final TestObserver to = co.test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onNext(1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void badSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onNext(1); + observer.onComplete(); + observer.onNext(2); + observer.onError(new TestException()); + observer.onComplete(); + } + } + .publish() + .autoConnect() + .test() + .assertResult(1); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void noErrorLoss() { + List errors = TestHelper.trackPluginErrors(); + try { + ConnectableObservable co = Observable.error(new TestException()).publish(); + + co.connect(); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void subscribeDisconnectRace() { + for (int i = 0; i < 500; i++) { + + final PublishSubject ps = PublishSubject.create(); + + final ConnectableObservable co = ps.publish(); + + final Disposable d = co.connect(); + final TestObserver to = new TestObserver(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + d.dispose(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + co.subscribe(to); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void selectorDisconnectsIndependentSource() { + PublishSubject ps = PublishSubject.create(); + + ps.publish(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable v) throws Exception { + return Observable.range(1, 2); + } + }) + .test() + .assertResult(1, 2); + + assertFalse(ps.hasObservers()); + } + + @Test(timeout = 5000) + public void selectorLatecommer() { + Observable.range(1, 5) + .publish(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable v) throws Exception { + return v.concatWith(v); + } + }) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void mainError() { + Observable.error(new TestException()) + .publish(Functions.>identity()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void selectorInnerError() { + PublishSubject ps = PublishSubject.create(); + + ps.publish(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable v) throws Exception { + return Observable.error(new TestException()); + } + }) + .test() + .assertFailure(TestException.class); + + assertFalse(ps.hasObservers()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java index 21fde06379..e92af3b2bb 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.util.*; @@ -28,6 +29,7 @@ import io.reactivex.Observer; import io.reactivex.disposables.*; import io.reactivex.functions.*; +import io.reactivex.observables.ConnectableObservable; import io.reactivex.observers.TestObserver; import io.reactivex.schedulers.*; import io.reactivex.subjects.ReplaySubject; @@ -591,4 +593,30 @@ public void onError(Throwable t) { public void onComplete() { } } + + @Test + public void disposed() { + TestHelper.checkDisposed(Observable.just(1).publish().refCount()); + } + + @Test + public void noOpConnect() { + final int[] calls = { 0 }; + Observable o = new ConnectableObservable() { + @Override + public void connect(Consumer connection) { + calls[0]++; + } + + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.disposed()); + } + }.refCount(); + + o.test(); + o.test(); + + assertEquals(1, calls[0]); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRepeatTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRepeatTest.java index 8748b7b1bd..81243f011c 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRepeatTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRepeatTest.java @@ -218,4 +218,43 @@ public void repeatLongPredicateInvalid() { } } + @Test + public void repeatUntilError() { + Observable.error(new TestException()) + .repeatUntil(new BooleanSupplier() { + @Override + public boolean getAsBoolean() throws Exception { + return true; + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void repeatUntilFalse() { + Observable.just(1) + .repeatUntil(new BooleanSupplier() { + @Override + public boolean getAsBoolean() throws Exception { + return true; + } + }) + .test() + .assertResult(1); + } + + @Test + public void repeatUntilSupplierCrash() { + Observable.just(1) + .repeatUntil(new BooleanSupplier() { + @Override + public boolean getAsBoolean() throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class, 1); + } + } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRetryWithPredicateTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRetryWithPredicateTest.java index add6ef3ecf..f4aa357f9d 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRetryWithPredicateTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRetryWithPredicateTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.io.IOException; @@ -28,9 +29,11 @@ import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.*; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.observers.*; +import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.PublishSubject; public class ObservableRetryWithPredicateTest { @@ -357,4 +360,113 @@ public void accept(Long t) { }}); assertEquals(Arrays.asList(1L,1L,2L,3L), list); } + + @Test + public void predicateThrows() { + + TestObserver to = Observable.error(new TestException("Outer")) + .retry(new Predicate() { + @Override + public boolean test(Throwable e) throws Exception { + throw new TestException("Inner"); + } + }) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + TestHelper.assertError(errors, 0, TestException.class, "Outer"); + TestHelper.assertError(errors, 1, TestException.class, "Inner"); + } + + @Test + public void dontRetry() { + Observable.error(new TestException("Outer")) + .retry(Functions.alwaysFalse()) + .test() + .assertFailureAndMessage(TestException.class, "Outer"); + } + + @Test + public void retryDisposeRace() { + for (int i = 0; i < 500; i++) { + final PublishSubject ps = PublishSubject.create(); + + final TestObserver to = ps.retry(Functions.alwaysTrue()).test(); + + final TestException ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + to.assertEmpty(); + } + } + + @Test + public void bipredicateThrows() { + + TestObserver to = Observable.error(new TestException("Outer")) + .retry(new BiPredicate() { + @Override + public boolean test(Integer n, Throwable e) throws Exception { + throw new TestException("Inner"); + } + }) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + TestHelper.assertError(errors, 0, TestException.class, "Outer"); + TestHelper.assertError(errors, 1, TestException.class, "Inner"); + } + + @Test + public void retryBiPredicateDisposeRace() { + for (int i = 0; i < 500; i++) { + final PublishSubject ps = PublishSubject.create(); + + final TestObserver to = ps.retry(new BiPredicate() { + @Override + public boolean test(Object t1, Object t2) throws Exception { + return true; + } + }).test(); + + final TestException ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + to.assertEmpty(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableSkipLastTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableSkipLastTest.java index 0b4cca2744..5e02c8b74a 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableSkipLastTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableSkipLastTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.util.Arrays; @@ -22,6 +23,7 @@ import org.mockito.InOrder; import io.reactivex.*; +import io.reactivex.exceptions.TestException; import io.reactivex.observers.TestObserver; import io.reactivex.schedulers.Schedulers; @@ -106,4 +108,16 @@ public void testSkipLastWithNegativeCount() { Observable.just("one").skipLast(-1); } + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.just(1).skipLast(1)); + } + + @Test + public void error() { + Observable.error(new TestException()) + .skipLast(1) + .test() + .assertFailure(TestException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeLastTimedTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeLastTimedTest.java index 1663130269..d7c2da5edf 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeLastTimedTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeLastTimedTest.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.util.concurrent.TimeUnit; @@ -22,6 +23,7 @@ import io.reactivex.*; import io.reactivex.exceptions.TestException; +import io.reactivex.observers.TestObserver; import io.reactivex.schedulers.*; import io.reactivex.subjects.PublishSubject; @@ -231,4 +233,47 @@ public void takeLastTimeDelayErrorCustomScheduler() { .assertFailure(TestException.class, 1, 2); } + @Test + public void disposed() { + TestHelper.checkDisposed(PublishSubject.create().takeLast(1, TimeUnit.MINUTES)); + } + + @Test + public void observeOn() { + Observable.range(1, 1000) + .takeLast(1, TimeUnit.DAYS) + .take(500) + .observeOn(Schedulers.single(), true, 1) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(500) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void cancelCompleteRace() { + for (int i = 0; i < 500; i++) { + final PublishSubject ps = PublishSubject.create(); + + final TestObserver to = ps.takeLast(1, TimeUnit.DAYS).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableThrottleFirstTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableThrottleFirstTest.java index 2bfe4ce85a..e307ecd47b 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableThrottleFirstTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableThrottleFirstTest.java @@ -15,6 +15,7 @@ import static org.mockito.Mockito.*; +import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.*; @@ -23,6 +24,7 @@ import io.reactivex.*; import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.TestException; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.TestScheduler; import io.reactivex.subjects.PublishSubject; @@ -156,4 +158,35 @@ public void throttleFirstDefaultScheduler() { .awaitDone(5, TimeUnit.SECONDS) .assertResult(1); } + + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.just(1).throttleFirst(1, TimeUnit.DAYS)); + } + + @Test + public void badSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onNext(1); + observer.onNext(2); + observer.onComplete(); + observer.onNext(3); + observer.onError(new TestException()); + observer.onComplete(); + } + } + .throttleFirst(1, TimeUnit.DAYS) + .test() + .assertResult(1); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableToListTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableToListTest.java index 56def715df..ee626aa2fc 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableToListTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableToListTest.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.util.*; @@ -24,6 +25,7 @@ import io.reactivex.*; import io.reactivex.Observable; import io.reactivex.Observer; +import io.reactivex.exceptions.TestException; public class ObservableToListTest { @@ -182,4 +184,34 @@ public void capacityHint() { .test() .assertResult(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); } + + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.just(1).toList().toObservable()); + } + + @SuppressWarnings("unchecked") + @Test + public void error() { + Observable.error(new TestException()) + .toList() + .toObservable() + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void collectionSupplierThrows() { + Observable.just(1) + .toList(new Callable>() { + @Override + public Collection call() throws Exception { + throw new TestException(); + } + }) + .toObservable() + .test() + .assertFailure(TestException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOnTest.java index 87129eda12..16f1d42aa2 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOnTest.java @@ -15,14 +15,18 @@ import static org.junit.Assert.*; +import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import org.junit.Test; import io.reactivex.*; -import io.reactivex.disposables.Disposable; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Action; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; public class ObservableUnsubscribeOnTest { @@ -185,4 +189,70 @@ public Thread getThread() { } } + + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.just(1).unsubscribeOn(Schedulers.single())); + } + + @Test + public void normal() { + final int[] calls = { 0 }; + + Observable.just(1) + .doOnDispose(new Action() { + @Override + public void run() throws Exception { + calls[0]++; + } + }) + .unsubscribeOn(Schedulers.single()) + .test() + .assertResult(1); + + assertEquals(0, calls[0]); + } + + @Test + public void error() { + final int[] calls = { 0 }; + + Observable.error(new TestException()) + .doOnDispose(new Action() { + @Override + public void run() throws Exception { + calls[0]++; + } + }) + .unsubscribeOn(Schedulers.single()) + .test() + .assertFailure(TestException.class); + + assertEquals(0, calls[0]); + } + + @Test + public void signalAfterDispose() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onNext(1); + observer.onNext(2); + observer.onError(new TestException()); + observer.onComplete(); + } + } + .unsubscribeOn(Schedulers.single()) + .take(1) + .test() + .assertResult(1); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithTimeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithTimeTest.java index 488f96d5c3..ce6985d190 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithTimeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithTimeTest.java @@ -429,4 +429,28 @@ public void restartTimer() { .test() .assertResult(1, 2, 3, 4, 5); } + + @Test + public void exactBoundaryError() { + Observable.error(new TestException()) + .window(1, TimeUnit.DAYS, Schedulers.single(), 2, true) + .test() + .assertSubscribed() + .assertError(TestException.class) + .assertNotComplete(); + } + + @Test + public void restartTimerMany() { + Observable.intervalRange(1, 1000, 1, 1, TimeUnit.MILLISECONDS) + .window(1, TimeUnit.MILLISECONDS, Schedulers.single(), 2, true) + .flatMap(Functions.>identity()) + .take(500) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(500) + .assertNoErrors() + .assertComplete(); + } }