diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 7815cd73db..8fb991144a 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -6139,7 +6139,7 @@ public final > Flowable buffer(Callable cache() { - return FlowableCache.from(this); + return cacheWithInitialCapacity(16); } /** @@ -6201,7 +6201,7 @@ public final Flowable cache() { @SchedulerSupport(SchedulerSupport.NONE) public final Flowable cacheWithInitialCapacity(int initialCapacity) { ObjectHelper.verifyPositive(initialCapacity, "initialCapacity"); - return FlowableCache.from(this, initialCapacity); + return RxJavaPlugins.onAssembly(new FlowableCache(this, initialCapacity)); } /** @@ -6466,7 +6466,7 @@ public final Flowable concatMapDelayError(Function(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.IMMEDIATE)); + return RxJavaPlugins.onAssembly(new FlowableConcatMap(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY)); } @@ -9633,13 +9633,13 @@ public final Flowable onBackpressureBuffer(int capacity, Action onOverflow) { * @param capacity number of slots available in the buffer. * @param onOverflow action to execute if an item needs to be buffered, but there are no available slots. Null is allowed. * @param overflowStrategy how should the {@code Publisher} react to buffer overflows. Null is not allowed. - * @return the source {@code Publisher} modified to buffer items up to the given capacity + * @return the source {@code Flowable} modified to buffer items up to the given capacity * @see ReactiveX operators documentation: backpressure operators * @since 2.0 */ @BackpressureSupport(BackpressureKind.SPECIAL) @SchedulerSupport(SchedulerSupport.NONE) - public final Publisher onBackpressureBuffer(long capacity, Action onOverflow, BackpressureOverflowStrategy overflowStrategy) { + public final Flowable onBackpressureBuffer(long capacity, Action onOverflow, BackpressureOverflowStrategy overflowStrategy) { ObjectHelper.requireNonNull(overflowStrategy, "strategy is null"); ObjectHelper.verifyPositive(capacity, "capacity"); return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBufferStrategy(this, capacity, onOverflow, overflowStrategy)); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java index 60a9424a55..e53080b667 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java @@ -18,9 +18,6 @@ import org.reactivestreams.*; import io.reactivex.Flowable; -import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.Exceptions; -import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins; @@ -36,38 +33,16 @@ public final class FlowableCache extends AbstractFlowableWithUpstream { final CacheState state; final AtomicBoolean once; - /** - * Creates a cached Flowable with a default capacity hint of 16. - * @param the value type - * @param source the source Observable to cache - * @return the CachedObservable instance - */ - public static Flowable from(Flowable source) { - return from(source, 16); - } - - /** - * Creates a cached Flowable with the given capacity hint. - * @param the value type - * @param source the source Observable to cache - * @param capacityHint the hint for the internal buffer size - * @return the CachedObservable instance - */ - public static Flowable from(Flowable source, int capacityHint) { - ObjectHelper.verifyPositive(capacityHint, "capacityHint"); - CacheState state = new CacheState(source, capacityHint); - return RxJavaPlugins.onAssembly(new FlowableCache(source, state)); - } /** * Private constructor because state needs to be shared between the Observable body and * the onSubscribe function. * @param source the upstream source whose signals to cache - * @param state the cache state object performing the caching and replaying + * @param capacityHint the capacity hint */ - private FlowableCache(Flowable source, CacheState state) { + public FlowableCache(Flowable source, int capacityHint) { super(source); - this.state = state; + this.state = new CacheState(source, capacityHint); this.once = new AtomicBoolean(); } @@ -100,7 +75,7 @@ protected void subscribeActual(Subscriber t) { * @return true if the cache has Subscribers */ /* public */ boolean hasSubscribers() { - return state.subscribers.length != 0; + return state.subscribers.get().length != 0; } /** @@ -122,9 +97,13 @@ static final class CacheState extends LinkedArrayList implements Subscriber connection = new AtomicReference(); /** Guarded by connection (not this). */ - volatile ReplaySubscription[] subscribers; + final AtomicReference[]> subscribers; + /** The default empty array of subscribers. */ + @SuppressWarnings("rawtypes") + static final ReplaySubscription[] EMPTY = new ReplaySubscription[0]; /** The default empty array of subscribers. */ - static final ReplaySubscription[] EMPTY = new ReplaySubscription[0]; + @SuppressWarnings("rawtypes") + static final ReplaySubscription[] TERMINATED = new ReplaySubscription[0]; /** Set to true after connection. */ volatile boolean isConnected; @@ -134,10 +113,11 @@ static final class CacheState extends LinkedArrayList implements Subscriber source, int capacityHint) { super(capacityHint); this.source = source; - this.subscribers = EMPTY; + this.subscribers = new AtomicReference[]>(EMPTY); } /** * Adds a ReplaySubscription to the subscribers array atomically. @@ -146,23 +126,33 @@ static final class CacheState extends LinkedArrayList implements Subscriber p) { // guarding by connection to save on allocating another object // thus there are two distinct locks guarding the value-addition and child come-and-go - synchronized (connection) { - ReplaySubscription[] a = subscribers; + for (;;) { + ReplaySubscription[] a = subscribers.get(); + if (a == TERMINATED) { + return; + } int n = a.length; - ReplaySubscription[] b = new ReplaySubscription[n + 1]; + @SuppressWarnings("unchecked") + ReplaySubscription[] b = new ReplaySubscription[n + 1]; System.arraycopy(a, 0, b, 0, n); b[n] = p; - subscribers = b; + if (subscribers.compareAndSet(a, b)) { + return; + } } } /** * Removes the ReplaySubscription (if present) from the subscribers array atomically. * @param p the target ReplaySubscription wrapping a downstream Subscriber with state */ + @SuppressWarnings("unchecked") public void removeChild(ReplaySubscription p) { - synchronized (connection) { - ReplaySubscription[] a = subscribers; + for (;;) { + ReplaySubscription[] a = subscribers.get(); int n = a.length; + if (n == 0) { + return; + } int j = -1; for (int i = 0; i < n; i++) { if (a[i].equals(p)) { @@ -173,14 +163,19 @@ public void removeChild(ReplaySubscription p) { if (j < 0) { return; } + + ReplaySubscription[] b; if (n == 1) { - subscribers = EMPTY; + b = EMPTY; + return; + } else { + b = new ReplaySubscription[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + } + if (subscribers.compareAndSet(a, b)) { return; } - ReplaySubscription[] b = new ReplaySubscription[n - 1]; - System.arraycopy(a, 0, b, 0, j); - System.arraycopy(a, j + 1, b, j, n - j - 1); - subscribers = b; } } @@ -204,9 +199,12 @@ public void onNext(T t) { if (!sourceDone) { Object o = NotificationLite.next(t); add(o); - dispatch(); + for (ReplaySubscription rp : subscribers.get()) { + rp.replay(); + } } } + @SuppressWarnings("unchecked") @Override public void onError(Throwable e) { if (!sourceDone) { @@ -214,9 +212,14 @@ public void onError(Throwable e) { Object o = NotificationLite.error(e); add(o); SubscriptionHelper.cancel(connection); - dispatch(); + for (ReplaySubscription rp : subscribers.getAndSet(TERMINATED)) { + rp.replay(); + } + } else { + RxJavaPlugins.onError(e); } } + @SuppressWarnings("unchecked") @Override public void onComplete() { if (!sourceDone) { @@ -224,16 +227,9 @@ public void onComplete() { Object o = NotificationLite.complete(); add(o); SubscriptionHelper.cancel(connection); - dispatch(); - } - } - /** - * Signals all known children there is work to do. - */ - void dispatch() { - ReplaySubscription[] a = subscribers; - for (ReplaySubscription rp : a) { - rp.replay(); + for (ReplaySubscription rp : subscribers.getAndSet(TERMINATED)) { + rp.replay(); + } } } } @@ -243,7 +239,8 @@ void dispatch() { * * @param */ - static final class ReplaySubscription extends AtomicLong implements Subscription, Disposable { + static final class ReplaySubscription + extends AtomicInteger implements Subscription { private static final long serialVersionUID = -2557562030197141021L; private static final long CANCELLED = -1; @@ -252,6 +249,8 @@ static final class ReplaySubscription extends AtomicLong implements Subscript /** The cache state object. */ final CacheState state; + final AtomicLong requested; + /** * Contains the reference to the buffer segment in replay. * Accessed after reading state.size() and when emitting == true. @@ -267,178 +266,119 @@ static final class ReplaySubscription extends AtomicLong implements Subscript */ int index; - /** Indicates there is a replay going on; guarded by this. */ - boolean emitting; - /** Indicates there were some state changes/replay attempts; guarded by this. */ - boolean missed; - ReplaySubscription(Subscriber child, CacheState state) { this.child = child; this.state = state; + this.requested = new AtomicLong(); } @Override public void request(long n) { - if (!SubscriptionHelper.validate(n)) { - return; - } - for (;;) { - long r = get(); - if (r == CANCELLED) { - return; - } - long u = BackpressureHelper.addCap(r, n); - if (compareAndSet(r, u)) { - replay(); - return; - } - } - } - /** - * Updates the request count to reflect values have been produced. - * @param n the produced amount - * @return the current requested amount - */ - public long produced(long n) { - return addAndGet(-n); - } - - @Override - public boolean isDisposed() { - return get() == CANCELLED; - } - @Override - public void dispose() { - long r = get(); - if (r != CANCELLED) { - r = getAndSet(CANCELLED); - if (r != CANCELLED) { - state.removeChild(this); + if (SubscriptionHelper.validate(n)) { + for (;;) { + long r = requested.get(); + if (r == CANCELLED) { + return; + } + long u = BackpressureHelper.addCap(r, n); + if (requested.compareAndSet(r, u)) { + replay(); + return; + } } } } @Override public void cancel() { - dispose(); + if (requested.getAndSet(CANCELLED) != CANCELLED) { + state.removeChild(this); + } } /** * Continue replaying available values if there are requests for them. */ public void replay() { - // make sure there is only a single thread emitting - synchronized (this) { - if (emitting) { - missed = true; - return; - } - emitting = true; + if (getAndIncrement() != 0) { + return; } - boolean skipFinal = false; - try { - final Subscriber child = this.child; - for (;;) { + int missed = 1; + final Subscriber child = this.child; + AtomicLong rq = requested; - long r = get(); + for (;;) { - if (r < 0L) { - skipFinal = true; - return; - } + long r = rq.get(); - // read the size, if it is non-zero, we can safely read the head and - // read values up to the given absolute index - int s = state.size(); - if (s != 0) { - Object[] b = currentBuffer; + if (r < 0L) { + return; + } - // latch onto the very first buffer now that it is available. - if (b == null) { - b = state.head(); - currentBuffer = b; + // read the size, if it is non-zero, we can safely read the head and + // read values up to the given absolute index + int s = state.size(); + if (s != 0) { + Object[] b = currentBuffer; + + // latch onto the very first buffer now that it is available. + if (b == null) { + b = state.head(); + currentBuffer = b; + } + final int n = b.length - 1; + int j = index; + int k = currentIndexInBuffer; + int valuesProduced = 0; + + while (j < s && r > 0) { + if (rq.get() == CANCELLED) { + return; } - final int n = b.length - 1; - int j = index; - int k = currentIndexInBuffer; - // eagerly emit any terminal event - if (r == 0) { - Object o = b[k]; - if (NotificationLite.isComplete(o)) { - child.onComplete(); - skipFinal = true; - dispose(); - return; - } else - if (NotificationLite.isError(o)) { - child.onError(NotificationLite.getError(o)); - skipFinal = true; - dispose(); - return; - } - } else - if (r > 0) { - int valuesProduced = 0; - - while (j < s && r > 0) { - if (get() == CANCELLED) { - skipFinal = true; - return; - } - if (k == n) { - b = (Object[])b[n]; - k = 0; - } - Object o = b[k]; - - try { - if (NotificationLite.accept(o, child)) { - skipFinal = true; - dispose(); - return; - } - } catch (Throwable err) { - Exceptions.throwIfFatal(err); - skipFinal = true; - dispose(); - if (!NotificationLite.isError(o) && !NotificationLite.isComplete(o)) { - child.onError(err); - } - return; - } - - k++; - j++; - r--; - valuesProduced++; - } - - if (get() == CANCELLED) { - skipFinal = true; - return; - } - - index = j; - currentIndexInBuffer = k; - currentBuffer = b; - produced(valuesProduced); + if (k == n) { + b = (Object[])b[n]; + k = 0; } + Object o = b[k]; + + if (NotificationLite.accept(o, child)) { + return; + } + + k++; + j++; + r--; + valuesProduced++; } - synchronized (this) { - if (!missed) { - emitting = false; - skipFinal = true; + if (rq.get() == CANCELLED) { + return; + } + + if (r == 0) { + Object o = b[k]; + if (NotificationLite.isComplete(o)) { + child.onComplete(); + return; + } else + if (NotificationLite.isError(o)) { + child.onError(NotificationLite.getError(o)); return; } - missed = false; } - } - } finally { - if (!skipFinal) { - synchronized (this) { - emitting = false; + + if (valuesProduced != 0) { + BackpressureHelper.producedCancel(rq, valuesProduced); } + + index = j; + currentIndexInBuffer = k; + currentBuffer = b; + } + + missed = addAndGet(-missed); + if (missed == 0) { + break; } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java index 713565bfbb..bf8d7548e0 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java @@ -89,7 +89,7 @@ abstract static class BaseConcatMapSubscriber volatile boolean cancelled; - final AtomicReference error; + final AtomicThrowable errors; volatile boolean active; @@ -102,7 +102,7 @@ abstract static class BaseConcatMapSubscriber this.prefetch = prefetch; this.limit = prefetch - (prefetch >> 2); this.inner = new ConcatMapInner(this); - this.error = new AtomicReference(); + this.errors = new AtomicThrowable(); } @Override @@ -198,14 +198,11 @@ void subscribeActual() { @Override public void onError(Throwable t) { - if (ExceptionHelper.addThrowable(error, t)) { + if (errors.addThrowable(t)) { inner.cancel(); if (getAndIncrement() == 0) { - t = ExceptionHelper.terminate(error); - if (t != ExceptionHelper.TERMINATED) { - actual.onError(t); - } + actual.onError(errors.terminate()); } } else { RxJavaPlugins.onError(t); @@ -219,23 +216,17 @@ public void innerNext(R value) { if (compareAndSet(1, 0)) { return; } - Throwable e = ExceptionHelper.terminate(error); - if (e != ExceptionHelper.TERMINATED) { - actual.onError(e); - } + actual.onError(errors.terminate()); } } @Override public void innerError(Throwable e) { - if (ExceptionHelper.addThrowable(error, e)) { + if (errors.addThrowable(e)) { s.cancel(); if (getAndIncrement() == 0) { - e = ExceptionHelper.terminate(error); - if (e != ExceptionHelper.TERMINATED) { - actual.onError(e); - } + actual.onError(errors.terminate()); } } else { RxJavaPlugins.onError(e); @@ -275,7 +266,8 @@ void drain() { } catch (Throwable e) { Exceptions.throwIfFatal(e); s.cancel(); - actual.onError(e); + errors.addThrowable(e); + actual.onError(errors.terminate()); return; } @@ -295,7 +287,8 @@ void drain() { Exceptions.throwIfFatal(e); s.cancel(); - actual.onError(e); + errors.addThrowable(e); + actual.onError(errors.terminate()); return; } @@ -321,7 +314,8 @@ void drain() { } catch (Throwable e) { Exceptions.throwIfFatal(e); s.cancel(); - actual.onError(e); + errors.addThrowable(e); + actual.onError(errors.terminate()); return; } @@ -334,10 +328,7 @@ void drain() { if (get() == 0 && compareAndSet(0, 1)) { actual.onNext(vr); if (!compareAndSet(1, 0)) { - Throwable e = ExceptionHelper.terminate(error); - if (e != ExceptionHelper.TERMINATED) { - actual.onError(e); - } + actual.onError(errors.terminate()); return; } } @@ -412,7 +403,7 @@ void subscribeActual() { @Override public void onError(Throwable t) { - if (ExceptionHelper.addThrowable(error, t)) { + if (errors.addThrowable(t)) { done = true; drain(); } else { @@ -428,7 +419,7 @@ public void innerNext(R value) { @Override public void innerError(Throwable e) { - if (ExceptionHelper.addThrowable(error, e)) { + if (errors.addThrowable(e)) { if (!veryEnd) { s.cancel(); done = true; @@ -469,12 +460,9 @@ void drain() { boolean d = done; if (d && !veryEnd) { - Throwable ex = error.get(); + Throwable ex = errors.get(); if (ex != null) { - ex = ExceptionHelper.terminate(error); - if (ex != ExceptionHelper.TERMINATED) { - actual.onError(ex); - } + actual.onError(errors.terminate()); return; } } @@ -486,15 +474,16 @@ void drain() { } catch (Throwable e) { Exceptions.throwIfFatal(e); s.cancel(); - actual.onError(e); + errors.addThrowable(e); + actual.onError(errors.terminate()); return; } boolean empty = v == null; if (d && empty) { - Throwable ex = ExceptionHelper.terminate(error); - if (ex != null && ex != ExceptionHelper.TERMINATED) { + Throwable ex = errors.terminate(); + if (ex != null) { actual.onError(ex); } else { actual.onComplete(); @@ -511,7 +500,8 @@ void drain() { Exceptions.throwIfFatal(e); s.cancel(); - actual.onError(e); + errors.addThrowable(e); + actual.onError(errors.terminate()); return; } @@ -536,7 +526,8 @@ void drain() { } catch (Throwable e) { Exceptions.throwIfFatal(e); s.cancel(); - actual.onError(e); + errors.addThrowable(e); + actual.onError(errors.terminate()); return; } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycle.java index a99191b011..8cf8cbf61b 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycle.java @@ -46,7 +46,7 @@ static final class SubscriptionLambdaSubscriber implements Subscriber, Sub Subscription s; - public SubscriptionLambdaSubscriber(Subscriber actual, + SubscriptionLambdaSubscriber(Subscriber actual, Consumer onSubscribe, LongConsumer onRequest, Action onCancel) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java index 0c96788149..8176e4476b 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java @@ -162,6 +162,9 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { + if (done) { + return; + } if (fusionMode != ASYNC && !queue.offer(t)) { onError(new MissingBackpressureException("Queue is full?!")); return; @@ -171,7 +174,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - if (ExceptionHelper.addThrowable(error, t)) { + if (!done && ExceptionHelper.addThrowable(error, t)) { done = true; drain(); } else { @@ -181,6 +184,9 @@ public void onError(Throwable t) { @Override public void onComplete() { + if (done) { + return; + } done = true; drain(); } @@ -262,9 +268,11 @@ void drain() { } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.cancel(); - onError(ex); it = null; - continue; + ExceptionHelper.addThrowable(error, ex); + ex = ExceptionHelper.terminate(error); + a.onError(ex); + return; } if (!b) { @@ -292,9 +300,12 @@ void drain() { v = it.next(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); + current = null; s.cancel(); - onError(ex); - continue; + ExceptionHelper.addThrowable(error, ex); + ex = ExceptionHelper.terminate(error); + a.onError(ex); + return; } a.onNext(v); @@ -311,9 +322,12 @@ void drain() { b = it.hasNext(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); + current = null; s.cancel(); - onError(ex); - continue; + ExceptionHelper.addThrowable(error, ex); + ex = ExceptionHelper.terminate(error); + a.onError(ex); + return; } if (!b) { @@ -326,16 +340,7 @@ void drain() { if (e == r) { boolean d = done; - boolean empty; - - try { - empty = q.isEmpty() && it == null; - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - s.cancel(); - onError(ex); - empty = true; - } + boolean empty = q.isEmpty() && it == null; if (checkTerminated(d, empty, a, q)) { return; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableIntervalRange.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableIntervalRange.java index 7d28bf2598..e073a5b25a 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableIntervalRange.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableIntervalRange.java @@ -92,11 +92,10 @@ public void run() { actual.onNext(c); if (c == end) { - try { + if (resource.get() != DisposableHelper.DISPOSED) { actual.onComplete(); - } finally { - DisposableHelper.dispose(resource); } + DisposableHelper.dispose(resource); return; } @@ -106,11 +105,8 @@ public void run() { decrementAndGet(); } } else { - try { - actual.onError(new MissingBackpressureException("Can't deliver value " + count + " due to lack of requests")); - } finally { - DisposableHelper.dispose(resource); - } + actual.onError(new MissingBackpressureException("Can't deliver value " + count + " due to lack of requests")); + DisposableHelper.dispose(resource); } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableLift.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableLift.java index db45ca8674..8ca0891e2d 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableLift.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableLift.java @@ -37,20 +37,9 @@ public FlowableLift(Publisher source, FlowableOperator operator() { - return operator; - } - @Override public void subscribeActual(Subscriber s) { try { - if (s == null) { - throw new NullPointerException("Operator " + operator + " received a null Subscriber"); - } Subscriber st = operator.apply(s); if (st == null) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferStrategy.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferStrategy.java index fe43f1d560..2c63244574 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferStrategy.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferStrategy.java @@ -264,7 +264,7 @@ void drain() { } } - if (e != 0L && r != Long.MAX_VALUE) { + if (e != 0L) { BackpressureHelper.produced(requested, e); } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReduce.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReduce.java index 14f1222571..428d09ef31 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReduce.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReduce.java @@ -13,8 +13,6 @@ package io.reactivex.internal.operators.flowable; -import java.util.NoSuchElementException; - import org.reactivestreams.*; import io.reactivex.exceptions.Exceptions; @@ -108,7 +106,7 @@ public void onComplete() { if (v != null) { complete(v); } else { - actual.onError(new NoSuchElementException()); + actual.onComplete(); } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java index 1daaabec7e..5d5d086ca2 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java @@ -132,7 +132,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (done) { - RxJavaPlugins.onError(error); + RxJavaPlugins.onError(t); return; } error = t; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java index 004577a24f..12f8bd0d1e 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java @@ -84,30 +84,30 @@ static final class WindowBoundaryMainSubscriber @Override public void onSubscribe(Subscription s) { - if (!SubscriptionHelper.validate(this.s, s)) { - return; - } - - this.s = s; + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; - actual.onSubscribe(this); + actual.onSubscribe(this); - if (cancelled) { - return; - } + if (cancelled) { + return; + } - OperatorWindowBoundaryOpenSubscriber os = new OperatorWindowBoundaryOpenSubscriber(this); + OperatorWindowBoundaryOpenSubscriber os = new OperatorWindowBoundaryOpenSubscriber(this); - if (boundary.compareAndSet(null, os)) { - windows.getAndIncrement(); - s.request(Long.MAX_VALUE); - open.subscribe(os); + if (boundary.compareAndSet(null, os)) { + windows.getAndIncrement(); + s.request(Long.MAX_VALUE); + open.subscribe(os); + } } - } @Override public void onNext(T t) { + if (done) { + return; + } if (fastEnter()) { for (UnicastProcessor w : ws) { w.onNext(t); @@ -174,10 +174,9 @@ void complete() { } void error(Throwable t) { - if (windows.decrementAndGet() == 0) { - s.cancel(); - resources.dispose(); - } + s.cancel(); + resources.dispose(); + DisposableHelper.dispose(boundary); actual.onError(t); } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java index bfc22dc905..b90ae6d3cd 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java @@ -76,56 +76,58 @@ static final class WindowBoundaryMainSubscriber @Override public void onSubscribe(Subscription s) { - if (!SubscriptionHelper.validate(this.s, s)) { - return; - } - this.s = s; + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; - Subscriber> a = actual; - a.onSubscribe(this); + Subscriber> a = actual; + a.onSubscribe(this); - if (cancelled) { - return; - } + if (cancelled) { + return; + } - Publisher p; + Publisher p; - try { - p = ObjectHelper.requireNonNull(other.call(), "The first window publisher supplied is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - s.cancel(); - a.onError(e); - return; - } + try { + p = ObjectHelper.requireNonNull(other.call(), "The first window publisher supplied is null"); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + s.cancel(); + a.onError(e); + return; + } - UnicastProcessor w = UnicastProcessor.create(bufferSize); + UnicastProcessor w = UnicastProcessor.create(bufferSize); - long r = requested(); - if (r != 0L) { - a.onNext(w); - if (r != Long.MAX_VALUE) { - produced(1); + long r = requested(); + if (r != 0L) { + a.onNext(w); + if (r != Long.MAX_VALUE) { + produced(1); + } + } else { + s.cancel(); + a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests")); + return; } - } else { - s.cancel(); - a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests")); - return; - } - window = w; + window = w; - WindowBoundaryInnerSubscriber inner = new WindowBoundaryInnerSubscriber(this); + WindowBoundaryInnerSubscriber inner = new WindowBoundaryInnerSubscriber(this); - if (boundary.compareAndSet(null, inner)) { - windows.getAndIncrement(); - s.request(Long.MAX_VALUE); - p.subscribe(inner); + if (boundary.compareAndSet(null, inner)) { + windows.getAndIncrement(); + s.request(Long.MAX_VALUE); + p.subscribe(inner); + } } } @Override public void onNext(T t) { + if (done) { + return; + } if (fastEnter()) { UnicastProcessor w = window; @@ -146,7 +148,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (done) { - RxJavaPlugins.onError(error); + RxJavaPlugins.onError(t); return; } error = t; diff --git a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterable.java b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterable.java index fd2891fe71..891b08dbe6 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterable.java +++ b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterable.java @@ -39,7 +39,7 @@ public Iterator iterator() { source.subscribe(it); return it; } - + static final class BlockingObservableIterator extends AtomicReference implements io.reactivex.Observer, Iterator, Disposable { @@ -56,7 +56,7 @@ static final class BlockingObservableIterator volatile boolean done; Throwable error; - public BlockingObservableIterator(int batchSize) { + BlockingObservableIterator(int batchSize) { this.queue = new SpscLinkedArrayQueue(batchSize); this.lock = new ReentrantLock(); this.condition = lock.newCondition(); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableGroupBy.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableGroupBy.java index d63a948d62..cc1b8be52d 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableGroupBy.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableGroupBy.java @@ -220,6 +220,8 @@ static final class State extends AtomicInteger implements Disposable, Obse final AtomicBoolean cancelled = new AtomicBoolean(); + final AtomicBoolean once = new AtomicBoolean(); + final AtomicReference> actual = new AtomicReference>(); State(int bufferSize, GroupByObserver parent, K key, boolean delayError) { @@ -233,6 +235,7 @@ static final class State extends AtomicInteger implements Disposable, Obse public void dispose() { if (cancelled.compareAndSet(false, true)) { if (getAndIncrement() == 0) { + actual.lazySet(null); parent.cancel(key); } } @@ -245,9 +248,14 @@ public boolean isDisposed() { @Override public void subscribe(Observer s) { - if (actual.compareAndSet(null, s)) { + if (once.compareAndSet(false, true)) { s.onSubscribe(this); - drain(); + actual.lazySet(s); + if (cancelled.get()) { + actual.lazySet(null); + } else { + drain(); + } } else { EmptyDisposable.error(new IllegalStateException("Only one Observer allowed!"), s); } @@ -280,10 +288,6 @@ void drain() { Observer a = actual.get(); for (;;) { if (a != null) { - if (checkTerminated(done, q.isEmpty(), a, delayError)) { - return; - } - for (;;) { boolean d = done; T v = q.poll(); @@ -315,6 +319,7 @@ boolean checkTerminated(boolean d, boolean empty, Observer a, boolean if (cancelled.get()) { queue.clear(); parent.cancel(key); + actual.lazySet(null); return true; } @@ -322,6 +327,7 @@ boolean checkTerminated(boolean d, boolean empty, Observer a, boolean if (delayError) { if (empty) { Throwable e = error; + actual.lazySet(null); if (e != null) { a.onError(e); } else { @@ -333,10 +339,12 @@ boolean checkTerminated(boolean d, boolean empty, Observer a, boolean Throwable e = error; if (e != null) { queue.clear(); + actual.lazySet(null); a.onError(e); return true; } else if (empty) { + actual.lazySet(null); a.onComplete(); return true; } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableSampleWithObservable.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableSampleWithObservable.java index d4706df474..099284309e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableSampleWithObservable.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableSampleWithObservable.java @@ -80,13 +80,7 @@ public void onComplete() { } boolean setOther(Disposable o) { - if (other.get() == null) { - if (other.compareAndSet(null, o)) { - return true; - } - o.dispose(); - } - return false; + return DisposableHelper.setOnce(other, o); } @Override @@ -101,12 +95,12 @@ public boolean isDisposed() { } public void error(Throwable e) { - dispose(); + s.dispose(); actual.onError(e); } public void complete() { - dispose(); + s.dispose(); actual.onComplete(); } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java index d15a412558..987c70abef 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java @@ -96,7 +96,8 @@ public void onSubscribe(Disposable s) { if (boundary.compareAndSet(null, os)) { windows.getAndIncrement(); open.subscribe(os); - } } + } + } } @Override diff --git a/src/main/java/io/reactivex/internal/util/BackpressureHelper.java b/src/main/java/io/reactivex/internal/util/BackpressureHelper.java index 5556325574..ba035a3093 100644 --- a/src/main/java/io/reactivex/internal/util/BackpressureHelper.java +++ b/src/main/java/io/reactivex/internal/util/BackpressureHelper.java @@ -121,4 +121,31 @@ public static long produced(AtomicLong requested, long n) { } } } + + /** + * Atomically subtract the given number (positive, not validated) from the target field if + * it doesn't contain Long.MIN_VALUE (indicating some cancelled state). + * @param requested the target field holding the current requested amount + * @param n the produced element count, positive (not validated) + * @return the new amount + */ + public static long producedCancel(AtomicLong requested, long n) { + for (;;) { + long current = requested.get(); + if (current == Long.MIN_VALUE) { + return Long.MIN_VALUE; + } + if (current == Long.MAX_VALUE) { + return Long.MAX_VALUE; + } + long update = current - n; + if (update < 0L) { + RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update)); + update = 0L; + } + if (requested.compareAndSet(current, update)) { + return update; + } + } + } } diff --git a/src/test/java/io/reactivex/TestHelper.java b/src/test/java/io/reactivex/TestHelper.java index 0646c65037..06aaabda2a 100644 --- a/src/test/java/io/reactivex/TestHelper.java +++ b/src/test/java/io/reactivex/TestHelper.java @@ -611,17 +611,17 @@ public void onSubscribe(Subscription s) { s.cancel(); } - + @Override public void onNext(Object t) { ts.onNext(t); } - + @Override public void onError(Throwable t) { ts.onError(t); } - + @Override public void onComplete() { ts.onComplete(); diff --git a/src/test/java/io/reactivex/flowable/FlowableTests.java b/src/test/java/io/reactivex/flowable/FlowableTests.java index 2d782b5dc2..dc816e2ec3 100644 --- a/src/test/java/io/reactivex/flowable/FlowableTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableTests.java @@ -271,10 +271,7 @@ public Integer apply(Integer t1, Integer t2) { verify(w).onNext(10); } - /** - * A reduce should fail with an NoSuchElementException if done on an empty Observable. - */ - @Test(expected = NoSuchElementException.class) + @Test public void testReduceWithEmptyObservable() { Flowable observable = Flowable.range(1, 0); observable.reduce(new BiFunction() { diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableCacheTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableCacheTest.java index da1ac51efd..fcdbc8a713 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableCacheTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableCacheTest.java @@ -34,7 +34,7 @@ public class FlowableCacheTest { @Test public void testColdReplayNoBackpressure() { - FlowableCache source = (FlowableCache)FlowableCache.from(Flowable.range(0, 1000)); + FlowableCache source = new FlowableCache(Flowable.range(0, 1000), 16); assertFalse("Source is connected!", source.isConnected()); @@ -56,7 +56,7 @@ public void testColdReplayNoBackpressure() { } @Test public void testColdReplayBackpressure() { - FlowableCache source = (FlowableCache)FlowableCache.from(Flowable.range(0, 1000)); + FlowableCache source = new FlowableCache(Flowable.range(0, 1000), 16); assertFalse("Source is connected!", source.isConnected()); @@ -66,7 +66,7 @@ public void testColdReplayBackpressure() { source.subscribe(ts); assertTrue("Source is not connected!", source.isConnected()); - assertTrue("Subscribers not retained!", source.hasSubscribers()); + assertFalse("Subscribers retained!", source.hasSubscribers()); ts.assertNoErrors(); ts.assertNotComplete(); @@ -145,7 +145,7 @@ public void testUnsubscribeSource() throws Exception { public void testTake() { TestSubscriber ts = new TestSubscriber(); - FlowableCache cached = (FlowableCache)FlowableCache.from(Flowable.range(1, 100)); + FlowableCache cached = new FlowableCache(Flowable.range(1, 100), 16); cached.take(10).subscribe(ts); ts.assertNoErrors(); @@ -161,7 +161,7 @@ public void testAsync() { for (int i = 0; i < 100; i++) { TestSubscriber ts1 = new TestSubscriber(); - FlowableCache cached = (FlowableCache)FlowableCache.from(source); + FlowableCache cached = new FlowableCache(source, 16); cached.observeOn(Schedulers.computation()).subscribe(ts1); @@ -184,7 +184,7 @@ public void testAsyncComeAndGo() { Flowable source = Flowable.interval(1, 1, TimeUnit.MILLISECONDS) .take(1000) .subscribeOn(Schedulers.io()); - FlowableCache cached = (FlowableCache)FlowableCache.from(source); + FlowableCache cached = new FlowableCache(source, 16); Flowable output = cached.observeOn(Schedulers.computation()); @@ -262,6 +262,7 @@ public void testValuesAndThenError() { } @Test + @Ignore("RS subscribers should not throw") public void unsafeChildThrows() { final AtomicInteger count = new AtomicInteger(); @@ -375,4 +376,48 @@ public void disposeOnArrival() { .test(0L, true) .assertEmpty(); } + + @Test + public void badSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.cache(); + } + }, false, 1, 1, 1); + } + + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(Flowable.never().cache()); + } + + @Test + public void take1() { + Flowable cache = Flowable.just(1, 2) + .cache(); + + cache.test(); + + cache + .take(1) + .test() + .assertResult(1); + } + + @Test + public void empty() { + Flowable.empty() + .cache() + .test(0L) + .assertResult(); + } + + @Test + public void error() { + Flowable.error(new TestException()) + .cache() + .test(0L) + .assertFailure(TestException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java index 8121964373..30c8c5dadf 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java @@ -1027,4 +1027,37 @@ public Integer apply(Integer v) throws Exception { .assertFailure(TestException.class); } + @Test + public void fuseAndTake() { + UnicastProcessor us = UnicastProcessor.create(); + + us.onNext(1); + us.onComplete(); + + us.concatMapEager(new Function>() { + @Override + public Flowable apply(Integer v) throws Exception { + return Flowable.just(1); + } + }) + .take(1) + .test() + .assertResult(1); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>() { + @Override + public Flowable apply(Flowable o) throws Exception { + return o.concatMapEager(new Function>() { + @Override + public Flowable apply(Object v) throws Exception { + return Flowable.just(v); + } + }); + } + }); + } + } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatTest.java index 0fca903cb0..4058ce72d1 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.flowable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.lang.reflect.Method; @@ -26,12 +27,12 @@ import org.reactivestreams.*; import io.reactivex.*; -import io.reactivex.Flowable; import io.reactivex.disposables.*; import io.reactivex.exceptions.*; import io.reactivex.functions.Function; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.*; import io.reactivex.schedulers.*; import io.reactivex.subscribers.*; @@ -1230,7 +1231,7 @@ public Flowable apply(Object v) throws Exception { @Test public void concatMapJustSource() { - Flowable.just(0) + Flowable.just(0).hide() .concatMap(new Function>() { @Override public Flowable apply(Object v) throws Exception { @@ -1239,7 +1240,306 @@ public Flowable apply(Object v) throws Exception { }, 16) .test() .assertResult(1); + } + + @Test + public void concatMapJustSourceDelayError() { + Flowable.just(0).hide() + .concatMapDelayError(new Function>() { + @Override + public Flowable apply(Object v) throws Exception { + return Flowable.just(1); + } + }, 16, false) + .test() + .assertResult(1); + } + + @Test + public void concatMapScalarBackpressured() { + Flowable.just(1).hide() + .concatMap(Functions.justFunction(Flowable.just(2))) + .test(1L) + .assertResult(2); + } + + @Test + public void concatMapScalarBackpressuredDelayError() { + Flowable.just(1).hide() + .concatMapDelayError(Functions.justFunction(Flowable.just(2))) + .test(1L) + .assertResult(2); + } + + @Test + public void concatMapEmpty() { + Flowable.just(1).hide() + .concatMap(Functions.justFunction(Flowable.empty())) + .test() + .assertResult(); + } + + @Test + public void concatMapEmptyDelayError() { + Flowable.just(1).hide() + .concatMapDelayError(Functions.justFunction(Flowable.empty())) + .test() + .assertResult(); + } + + @Test + public void ignoreBackpressure() { + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + for (int i = 0; i < 10; i++) { + s.onNext(i); + } + } + } + .concatMap(Functions.justFunction(Flowable.just(2)), 8) + .test(0L) + .assertFailure(IllegalStateException.class); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + return f.concatMap(Functions.justFunction(Flowable.just(2))); + } + }); + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + return f.concatMapDelayError(Functions.justFunction(Flowable.just(2))); + } + }); + } + + @Test + public void immediateInnerNextOuterError() { + final PublishProcessor pp = PublishProcessor.create(); + + final TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + pp.onError(new TestException("First")); + } + } + }; + + pp.concatMap(Functions.justFunction(Flowable.just(1))) + .subscribe(ts); + + pp.onNext(1); + + assertFalse(pp.hasSubscribers()); + + ts.assertFailureAndMessage(TestException.class, "First", 1); + } + + @Test + public void immediateInnerNextOuterError2() { + final PublishProcessor pp = PublishProcessor.create(); + + final TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + pp.onError(new TestException("First")); + } + } + }; + + pp.concatMap(Functions.justFunction(Flowable.just(1).hide())) + .subscribe(ts); + + pp.onNext(1); + + assertFalse(pp.hasSubscribers()); + + ts.assertFailureAndMessage(TestException.class, "First", 1); + } + + @Test + public void concatMapInnerError() { + Flowable.just(1).hide() + .concatMap(Functions.justFunction(Flowable.error(new TestException()))) + .test() + .assertFailure(TestException.class); + } + @Test + public void concatMapInnerErrorDelayError() { + Flowable.just(1).hide() + .concatMapDelayError(Functions.justFunction(Flowable.error(new TestException()))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void badSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.concatMap(Functions.justFunction(Flowable.just(1).hide())); + } + }, true, 1, 1, 1); } + @Test + public void badInnerSource() { + @SuppressWarnings("rawtypes") + final Subscriber[] ts0 = { null }; + TestSubscriber ts = Flowable.just(1).hide().concatMap(Functions.justFunction(new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + ts0[0] = s; + s.onSubscribe(new BooleanSubscription()); + s.onError(new TestException("First")); + } + })) + .test(); + + ts.assertFailureAndMessage(TestException.class, "First"); + + List errors = TestHelper.trackPluginErrors(); + try { + ts0[0].onError(new TestException("Second")); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void badInnerSourceDelayError() { + @SuppressWarnings("rawtypes") + final Subscriber[] ts0 = { null }; + TestSubscriber ts = Flowable.just(1).hide().concatMapDelayError(Functions.justFunction(new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + ts0[0] = s; + s.onSubscribe(new BooleanSubscription()); + s.onError(new TestException("First")); + } + })) + .test(); + + ts.assertFailureAndMessage(TestException.class, "First"); + + List errors = TestHelper.trackPluginErrors(); + try { + ts0[0].onError(new TestException("Second")); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void badSourceDelayError() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.concatMap(Functions.justFunction(Flowable.just(1).hide())); + } + }, true, 1, 1, 1); + } + + @Test + public void fusedCrash() { + Flowable.range(1, 2) + .map(new Function() { + @Override + public Object apply(Integer v) throws Exception { throw new TestException(); } + }) + .concatMap(Functions.justFunction(Flowable.just(1))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void fusedCrashDelayError() { + Flowable.range(1, 2) + .map(new Function() { + @Override + public Object apply(Integer v) throws Exception { throw new TestException(); } + }) + .concatMapDelayError(Functions.justFunction(Flowable.just(1))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void callableCrash() { + Flowable.just(1).hide() + .concatMap(Functions.justFunction(Flowable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + throw new TestException(); + } + }))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void callableCrashDelayError() { + Flowable.just(1).hide() + .concatMapDelayError(Functions.justFunction(Flowable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + throw new TestException(); + } + }))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Flowable.range(1, 2) + .concatMap(Functions.justFunction(Flowable.just(1)))); + + TestHelper.checkDisposed(Flowable.range(1, 2) + .concatMapDelayError(Functions.justFunction(Flowable.just(1)))); + } + + @Test + public void notVeryEnd() { + Flowable.range(1, 2) + .concatMapDelayError(Functions.justFunction(Flowable.error(new TestException())), 16, false) + .test() + .assertFailure(TestException.class); + } + + @Test + public void error() { + Flowable.error(new TestException()) + .concatMapDelayError(Functions.justFunction(Flowable.just(2)), 16, false) + .test() + .assertFailure(TestException.class); + } + + @Test + public void mapperThrows() { + Flowable.range(1, 2) + .concatMap(new Function>() { + @Override + public Publisher apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDelaySubscriptionOtherTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDelaySubscriptionOtherTest.java index 0f2f9b7c13..0642c3dcfc 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDelaySubscriptionOtherTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDelaySubscriptionOtherTest.java @@ -17,9 +17,9 @@ import org.junit.*; import org.reactivestreams.Subscription; -import io.reactivex.Flowable; +import io.reactivex.*; import io.reactivex.exceptions.TestException; -import io.reactivex.functions.Consumer; +import io.reactivex.functions.*; import io.reactivex.processors.PublishProcessor; import io.reactivex.subscribers.TestSubscriber; @@ -309,4 +309,14 @@ public void accept(Subscription s) { public void otherNull() { Flowable.just(1).delaySubscription((Flowable)null); } + + @Test + public void badSourceOther() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable o) throws Exception { + return Flowable.just(1).delaySubscription(o); + } + }, false, 1, 1, 1); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnEachTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnEachTest.java index 414669de1d..1430e986b5 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnEachTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnEachTest.java @@ -173,11 +173,11 @@ public void accept(List booleans) { @Ignore("crashing publisher can't propagate to a subscriber") public void testFatalError() { // try { -// Observable.just(1, 2, 3) -// .flatMap(new Function>() { +// Flowable.just(1, 2, 3) +// .flatMap(new Function>() { // @Override -// public Observable apply(Integer integer) { -// return Observable.create(new Publisher() { +// public Flowable apply(Integer integer) { +// return Flowable.create(new Publisher() { // @Override // public void subscribe(Subscriber o) { // throw new NullPointerException("Test NPE"); @@ -717,4 +717,19 @@ public void run() throws Exception { assertEquals(5, call[0]); assertEquals(1, call[1]); } + + @Test + public void dispose() { + TestHelper.checkDisposed(Flowable.just(1).doOnEach(new TestSubscriber())); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>() { + @Override + public Flowable apply(Flowable o) throws Exception { + return o.doOnEach(new TestSubscriber()); + } + }); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java index c5f178744d..8fab6c56d2 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java @@ -195,6 +195,20 @@ public Publisher apply(Flowable o) throws Exception { return o.elementAt(0).toFlowable(); } }); + + TestHelper.checkDoubleOnSubscribeFlowableToMaybe(new Function, Maybe>() { + @Override + public Maybe apply(Flowable o) throws Exception { + return o.elementAt(0); + } + }); + + TestHelper.checkDoubleOnSubscribeFlowableToSingle(new Function, Single>() { + @Override + public Single apply(Flowable o) throws Exception { + return o.elementAt(0, 1); + } + }); } @Test @@ -215,6 +229,20 @@ public void errorFlowable() { .assertFailure(TestException.class); } + + @Test + public void error() { + Flowable.error(new TestException()) + .elementAt(1, 10) + .test() + .assertFailure(TestException.class); + + Flowable.error(new TestException()) + .elementAt(1) + .test() + .assertFailure(TestException.class); + } + @Test public void badSource() { List errors = TestHelper.trackPluginErrors(); @@ -239,12 +267,43 @@ protected void subscribeActual(Subscriber subscriber) { } finally { RxJavaPlugins.reset(); } + + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.elementAt(0); + } + }, false, null, 1); + + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.elementAt(0, 1); + } + }, false, null, 1, 1); + + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.elementAt(0).toFlowable(); + } + }, false, null, 1); + + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.elementAt(0, 1).toFlowable(); + } + }, false, null, 1, 1); } @Test public void dispose() { TestHelper.checkDisposed(PublishProcessor.create().elementAt(0).toFlowable()); TestHelper.checkDisposed(PublishProcessor.create().elementAt(0, 1).toFlowable()); + + TestHelper.checkDisposed(PublishProcessor.create().elementAt(0)); + TestHelper.checkDisposed(PublishProcessor.create().elementAt(0, 1)); } @Test diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybeTest.java index 8b79c48f11..d1aa479412 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybeTest.java @@ -19,11 +19,15 @@ import java.util.concurrent.*; import org.junit.Test; +import org.reactivestreams.Subscriber; import io.reactivex.*; -import io.reactivex.Flowable; +import io.reactivex.disposables.*; import io.reactivex.exceptions.*; import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; @@ -360,4 +364,164 @@ public MaybeSource apply(Integer v) throws Exception { to .assertFailure(TestException.class); } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>() { + @Override + public Flowable apply(Flowable f) throws Exception { + return f.flatMapMaybe(Functions.justFunction(Maybe.just(2))); + } + }); + } + + @Test + public void badSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + observer.onError(new TestException("First")); + observer.onError(new TestException("Second")); + } + } + .flatMapMaybe(Functions.justFunction(Maybe.just(2))) + .test() + .assertFailureAndMessage(TestException.class, "First"); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void badInnerSource() { + List errors = TestHelper.trackPluginErrors(); + try { + Flowable.just(1) + .flatMapMaybe(Functions.justFunction(new Maybe() { + @Override + protected void subscribeActual(MaybeObserver observer) { + observer.onSubscribe(Disposables.empty()); + observer.onError(new TestException("First")); + observer.onError(new TestException("Second")); + } + })) + .test() + .assertFailureAndMessage(TestException.class, "First"); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void emissionQueueTrigger() { + final PublishProcessor ps1 = PublishProcessor.create(); + final PublishProcessor ps2 = PublishProcessor.create(); + + TestSubscriber to = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + ps2.onNext(2); + ps2.onComplete(); + } + } + }; + + Flowable.just(ps1, ps2) + .flatMapMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(PublishProcessor v) throws Exception { + return v.singleElement(); + } + }) + .subscribe(to); + + ps1.onNext(1); + ps1.onComplete(); + + to.assertResult(1, 2); + } + + @Test + public void emissionQueueTrigger2() { + final PublishProcessor ps1 = PublishProcessor.create(); + final PublishProcessor ps2 = PublishProcessor.create(); + final PublishProcessor ps3 = PublishProcessor.create(); + + TestSubscriber to = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + ps2.onNext(2); + ps2.onComplete(); + } + } + }; + + Flowable.just(ps1, ps2, ps3) + .flatMapMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(PublishProcessor v) throws Exception { + return v.singleElement(); + } + }) + .subscribe(to); + + ps1.onNext(1); + ps1.onComplete(); + + ps3.onComplete(); + + to.assertResult(1, 2); + } + + @Test + public void disposeInner() { + final TestSubscriber to = new TestSubscriber(); + + Flowable.just(1).flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return new Maybe() { + @Override + protected void subscribeActual(MaybeObserver observer) { + observer.onSubscribe(Disposables.empty()); + + assertFalse(((Disposable)observer).isDisposed()); + + to.dispose(); + + assertTrue(((Disposable)observer).isDisposed()); + } + }; + } + }) + .subscribe(to); + + to + .assertEmpty(); + } + + @Test + public void innerSuccessCompletesAfterMain() { + PublishProcessor ps = PublishProcessor.create(); + + TestSubscriber to = Flowable.just(1).flatMapMaybe(Functions.justFunction(ps.singleElement())) + .test(); + + ps.onNext(2); + ps.onComplete(); + + to + .assertResult(2); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingleTest.java index b8a2c664d0..ea29c72651 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingleTest.java @@ -19,10 +19,15 @@ import java.util.concurrent.*; import org.junit.Test; +import org.reactivestreams.Subscriber; import io.reactivex.*; +import io.reactivex.disposables.*; import io.reactivex.exceptions.*; import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; @@ -309,4 +314,130 @@ public SingleSource apply(Integer v) throws Exception { } })); } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>() { + @Override + public Flowable apply(Flowable f) throws Exception { + return f.flatMapSingle(Functions.justFunction(Single.just(2))); + } + }); + } + + @Test + public void badSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + observer.onError(new TestException("First")); + observer.onError(new TestException("Second")); + } + } + .flatMapSingle(Functions.justFunction(Single.just(2))) + .test() + .assertFailureAndMessage(TestException.class, "First"); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void badInnerSource() { + List errors = TestHelper.trackPluginErrors(); + try { + Flowable.just(1) + .flatMapSingle(Functions.justFunction(new Single() { + @Override + protected void subscribeActual(SingleObserver observer) { + observer.onSubscribe(Disposables.empty()); + observer.onError(new TestException("First")); + observer.onError(new TestException("Second")); + } + })) + .test() + .assertFailureAndMessage(TestException.class, "First"); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void emissionQueueTrigger() { + final PublishProcessor ps1 = PublishProcessor.create(); + final PublishProcessor ps2 = PublishProcessor.create(); + + TestSubscriber to = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + ps2.onNext(2); + ps2.onComplete(); + } + } + }; + + Flowable.just(ps1, ps2) + .flatMapSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(PublishProcessor v) throws Exception { + return v.singleOrError(); + } + }) + .subscribe(to); + + ps1.onNext(1); + ps1.onComplete(); + + to.assertResult(1, 2); + } + + @Test + public void disposeInner() { + final TestSubscriber to = new TestSubscriber(); + + Flowable.just(1).flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return new Single() { + @Override + protected void subscribeActual(SingleObserver observer) { + observer.onSubscribe(Disposables.empty()); + + assertFalse(((Disposable)observer).isDisposed()); + + to.dispose(); + + assertTrue(((Disposable)observer).isDisposed()); + } + }; + } + }) + .subscribe(to); + + to + .assertEmpty(); + } + + @Test + public void innerSuccessCompletesAfterMain() { + PublishProcessor ps = PublishProcessor.create(); + + TestSubscriber to = Flowable.just(1).flatMapSingle(Functions.justFunction(ps.singleOrError())) + .test(); + + ps.onNext(2); + ps.onComplete(); + + to + .assertResult(2); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java index d56e4796d8..ee6fe599b8 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java @@ -568,4 +568,19 @@ public Iterable apply(Object v) throws Exception { } })); } + + @Test + public void badSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable o) throws Exception { + return o.flatMapIterable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + return Arrays.asList(10, 20); + } + }); + } + }, false, 1, 1, 10, 20); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFromArrayTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFromArrayTest.java index cbe249cae5..2f59669a9c 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFromArrayTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFromArrayTest.java @@ -18,7 +18,9 @@ import org.junit.*; -import io.reactivex.Flowable; +import io.reactivex.*; +import io.reactivex.functions.Predicate; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.fuseable.ScalarCallable; import io.reactivex.subscribers.TestSubscriber; @@ -65,6 +67,31 @@ public void backpressure() { ts.assertComplete(); } + @Test + public void conditionalBackpressure() { + TestSubscriber ts = TestSubscriber.create(0); + + create(1000) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + ts.assertNoErrors(); + ts.assertNoValues(); + ts.assertNotComplete(); + + ts.request(10); + + ts.assertNoErrors(); + ts.assertValueCount(10); + ts.assertNotComplete(); + + ts.request(1000); + + ts.assertNoErrors(); + ts.assertValueCount(1000); + ts.assertComplete(); + } + @Test public void empty() { Assert.assertSame(Flowable.empty(), Flowable.fromArray(new Object[0])); @@ -82,4 +109,86 @@ public void just10Arguments() { .test() .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); } + + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(Flowable.just(1, 2, 3)); + } + + @Test + public void conditionalOneIsNull() { + Flowable.fromArray(new Integer[] { null, 1 }) + .filter(Functions.alwaysTrue()) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void conditionalOneIsNullSlowPath() { + Flowable.fromArray(new Integer[] { null, 1 }) + .filter(Functions.alwaysTrue()) + .test(2L) + .assertFailure(NullPointerException.class); + } + + @Test + public void conditionalOneByOne() { + Flowable.fromArray(new Integer[] { 1, 2, 3, 4, 5 }) + .filter(Functions.alwaysTrue()) + .rebatchRequests(1) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void conditionalFiltered() { + Flowable.fromArray(new Integer[] { 1, 2, 3, 4, 5 }) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 2 == 0; + } + }) + .test() + .assertResult(2, 4); + } + + @Test + public void conditionalSlowPathCancel() { + Flowable.fromArray(new Integer[] { 1, 2, 3, 4, 5 }) + .filter(Functions.alwaysTrue()) + .subscribeWith(new TestSubscriber(5L) { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + cancel(); + onComplete(); + } + } + }) + .assertResult(1); + } + + @Test + public void conditionalSlowPathSkipCancel() { + Flowable.fromArray(new Integer[] { 1, 2, 3, 4, 5 }) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v < 2; + } + }) + .subscribeWith(new TestSubscriber(5L) { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + cancel(); + onComplete(); + } + } + }) + .assertResult(1); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java index dc404b22d5..6481a7fa0a 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java @@ -1723,4 +1723,13 @@ public void onNext(Integer t) { to.assertSubscribed().assertValue(1).assertNoErrors().assertNotComplete(); } + @Test + public void delayErrorSimpleComplete() { + Flowable.just(1) + .groupBy(Functions.justFunction(1), true) + .flatMap(Functions.>identity()) + .test() + .assertResult(1); + } + } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableIntervalRangeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableIntervalRangeTest.java index be4247240d..58f73e0a58 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableIntervalRangeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableIntervalRangeTest.java @@ -23,6 +23,7 @@ import org.junit.Test; import io.reactivex.*; +import io.reactivex.exceptions.MissingBackpressureException; import io.reactivex.schedulers.Schedulers; public class FlowableIntervalRangeTest { @@ -78,4 +79,34 @@ public void longOverflow() { public void dispose() { TestHelper.checkDisposed(Flowable.intervalRange(1, 2, 1, 1, TimeUnit.MILLISECONDS)); } + + @Test + public void backpressureBounded() { + Flowable.intervalRange(1, 2, 1, 1, TimeUnit.MILLISECONDS) + .test(2L) + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1L, 2L); + } + + @Test + public void backpressureOverflow() { + Flowable.intervalRange(1, 3, 1, 1, TimeUnit.MILLISECONDS) + .test(2L) + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(MissingBackpressureException.class, 1L, 2L); + } + + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(Flowable.intervalRange(1, 3, 1, 1, TimeUnit.MILLISECONDS)); + } + + @Test + public void take() { + Flowable.intervalRange(1, 2, 1, 1, TimeUnit.MILLISECONDS) + .take(1) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1L); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMapNotificationTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMapNotificationTest.java index dee4429535..b7e43f8367 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMapNotificationTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMapNotificationTest.java @@ -16,9 +16,13 @@ import java.util.concurrent.Callable; import org.junit.Test; +import org.reactivestreams.Subscriber; -import io.reactivex.Flowable; +import io.reactivex.*; import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.operators.flowable.FlowableMapNotification.MapNotificationSubscriber; +import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.processors.PublishProcessor; import io.reactivex.subscribers.TestSubscriber; @@ -142,4 +146,35 @@ public Integer call() { ts.assertComplete(); } + + @Test + public void dispose() { + TestHelper.checkDisposed(new Flowable() { + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + protected void subscribeActual(Subscriber observer) { + MapNotificationSubscriber mn = new MapNotificationSubscriber( + observer, + Functions.justFunction(Flowable.just(1)), + Functions.justFunction(Flowable.just(2)), + Functions.justCallable(Flowable.just(3)) + ); + mn.onSubscribe(new BooleanSubscription()); + } + }); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>() { + @Override + public Flowable apply(Flowable o) throws Exception { + return o.flatMap( + Functions.justFunction(Flowable.just(1)), + Functions.justFunction(Flowable.just(2)), + Functions.justCallable(Flowable.just(3)) + ); + } + }); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMapTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMapTest.java index feb5865bb5..5c019303a5 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMapTest.java @@ -729,4 +729,14 @@ public void fusedReject() { .assertResult(1, 2, 3, 4, 5); } + @Test + public void badSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable o) throws Exception { + return o.map(Functions.identity()); + } + }, false, 1, 1, 1); + } + } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferStrategyTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferStrategyTest.java index 67d4cc5175..fedea65972 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferStrategyTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferStrategyTest.java @@ -13,21 +13,21 @@ package io.reactivex.internal.operators.flowable; -import io.reactivex.Flowable; -import io.reactivex.functions.Action; -import io.reactivex.internal.subscriptions.BooleanSubscription; -import io.reactivex.subscribers.DefaultSubscriber; -import io.reactivex.subscribers.TestSubscriber; -import org.junit.Test; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; +import static io.reactivex.BackpressureOverflowStrategy.*; +import static io.reactivex.internal.functions.Functions.EMPTY_ACTION; +import static org.junit.Assert.assertEquals; import java.util.concurrent.atomic.AtomicInteger; -import static io.reactivex.BackpressureOverflowStrategy.DROP_LATEST; -import static io.reactivex.BackpressureOverflowStrategy.DROP_OLDEST; -import static io.reactivex.internal.functions.Functions.EMPTY_ACTION; -import static org.junit.Assert.assertEquals; +import org.junit.Test; +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.subscribers.*; public class FlowableOnBackpressureBufferStrategyTest { @@ -127,4 +127,82 @@ public void backpressureBufferZeroCapacity() throws InterruptedException { Flowable.empty().onBackpressureBuffer(0, EMPTY_ACTION , DROP_OLDEST); } + @Test + public void dispose() { + TestHelper.checkDisposed(Flowable.just(1) + .onBackpressureBuffer(16, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR)); + } + + @Test + public void error() { + Flowable + .error(new TestException()) + .onBackpressureBuffer(16, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR) + .test() + .assertFailure(TestException.class); + } + + @Test + public void overflowError() { + Flowable.range(1, 20) + .onBackpressureBuffer(8, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR) + .test(0L) + .assertFailure(MissingBackpressureException.class); + } + + @Test + public void badSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.onBackpressureBuffer(8, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR); + } + }, false, 1, 1, 1); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>() { + @Override + public Flowable apply(Flowable f) throws Exception { + return f.onBackpressureBuffer(8, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR); + } + }); + } + + @Test + public void overflowCrashes() { + Flowable.range(1, 20) + .onBackpressureBuffer(8, new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }, BackpressureOverflowStrategy.DROP_OLDEST) + .test(0L) + .assertFailure(TestException.class); + } + + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(Flowable.just(1) + .onBackpressureBuffer(16, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR)); + } + + @Test + public void empty() { + Flowable.empty() + .onBackpressureBuffer(16, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR) + .test(0L) + .assertResult(); + } + + @Test + public void justTake() { + Flowable.just(1) + .onBackpressureBuffer(16, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR) + .take(1) + .test() + .assertResult(1); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnErrorResumeNextViaObservableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnErrorResumeNextViaFlowableTest.java similarity index 99% rename from src/test/java/io/reactivex/internal/operators/flowable/FlowableOnErrorResumeNextViaObservableTest.java rename to src/test/java/io/reactivex/internal/operators/flowable/FlowableOnErrorResumeNextViaFlowableTest.java index b4fa2cba49..fae01b8647 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnErrorResumeNextViaObservableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnErrorResumeNextViaFlowableTest.java @@ -27,7 +27,7 @@ import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.*; -public class FlowableOnErrorResumeNextViaObservableTest { +public class FlowableOnErrorResumeNextViaFlowableTest { @Test public void testResumeNext() { diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnErrorResumeNextViaFunctionTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnErrorResumeNextViaFunctionTest.java index 4491c11d2c..b3742e3248 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnErrorResumeNextViaFunctionTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnErrorResumeNextViaFunctionTest.java @@ -16,6 +16,7 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; +import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; import org.junit.*; @@ -25,6 +26,7 @@ import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; @@ -76,7 +78,7 @@ public Flowable apply(Throwable t1) { public void testResumeNextWithAsyncExecution() { final AtomicReference receivedException = new AtomicReference(); Subscription s = mock(Subscription.class); - TestObservable w = new TestObservable(s, "one"); + TestFlowable w = new TestFlowable(s, "one"); Function> resume = new Function>() { @Override @@ -114,7 +116,7 @@ public Flowable apply(Throwable t1) { @Test public void testFunctionThrowsError() { Subscription s = mock(Subscription.class); - TestObservable w = new TestObservable(s, "one"); + TestFlowable w = new TestFlowable(s, "one"); Function> resume = new Function>() { @Override @@ -237,7 +239,7 @@ public void testMapResumeAsyncNext() { Flowable w = Flowable.just("one", "fail", "two", "three", "fail"); // Introduce map function that fails intermittently (Map does not prevent this when the observer is a - // rx.operator incl onErrorResumeNextViaObservable) + // rx.operator incl onErrorResumeNextViaFlowable) w = w.map(new Function() { @Override public String apply(String s) { @@ -274,27 +276,27 @@ public Flowable apply(Throwable t1) { verify(observer, times(1)).onNext("threeResume"); } - private static class TestObservable implements Publisher { + private static class TestFlowable implements Publisher { final String[] values; Thread t; - TestObservable(Subscription s, String... values) { + TestFlowable(Subscription s, String... values) { this.values = values; } @Override public void subscribe(final Subscriber observer) { - System.out.println("TestObservable subscribed to ..."); + System.out.println("TestFlowable subscribed to ..."); observer.onSubscribe(new BooleanSubscription()); t = new Thread(new Runnable() { @Override public void run() { try { - System.out.println("running TestObservable thread"); + System.out.println("running TestFlowable thread"); for (String s : values) { - System.out.println("TestObservable onNext: " + s); + System.out.println("TestFlowable onNext: " + s); observer.onNext(s); } throw new RuntimeException("Forced Failure"); @@ -304,9 +306,9 @@ public void run() { } }); - System.out.println("starting TestObservable thread"); + System.out.println("starting TestFlowable thread"); t.start(); - System.out.println("done starting TestObservable thread"); + System.out.println("done starting TestFlowable thread"); } } @@ -376,4 +378,15 @@ public Flowable apply(Throwable v) { ts.assertComplete(); } + @Test + public void badOtherSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable o) throws Exception { + return Flowable.error(new IOException()) + .onErrorResumeNext(Functions.justFunction(o)); + } + }, false, 1, 1, 1); + } + } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnErrorReturnTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnErrorReturnTest.java index 849f17f187..b9b5e8a39e 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnErrorReturnTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnErrorReturnTest.java @@ -34,7 +34,7 @@ public class FlowableOnErrorReturnTest { @Test public void testResumeNext() { - TestObservable f = new TestObservable("one"); + TestFlowable f = new TestFlowable("one"); Flowable w = Flowable.unsafeCreate(f); final AtomicReference capturedException = new AtomicReference(); @@ -70,7 +70,7 @@ public String apply(Throwable e) { */ @Test public void testFunctionThrowsError() { - TestObservable f = new TestObservable("one"); + TestFlowable f = new TestFlowable("one"); Flowable w = Flowable.unsafeCreate(f); final AtomicReference capturedException = new AtomicReference(); @@ -109,7 +109,7 @@ public void testMapResumeAsyncNext() { Flowable w = Flowable.just("one", "fail", "two", "three", "fail"); // Introduce map function that fails intermittently (Map does not prevent this when the observer is a - // rx.operator incl onErrorResumeNextViaObservable) + // rx.operator incl onErrorResumeNextViaFlowable) w = w.map(new Function() { @Override public String apply(String s) { @@ -179,27 +179,27 @@ public Integer apply(Integer t1) { ts.assertNoErrors(); } - private static class TestObservable implements Publisher { + private static class TestFlowable implements Publisher { final String[] values; Thread t; - TestObservable(String... values) { + TestFlowable(String... values) { this.values = values; } @Override public void subscribe(final Subscriber subscriber) { subscriber.onSubscribe(new BooleanSubscription()); - System.out.println("TestObservable subscribed to ..."); + System.out.println("TestFlowable subscribed to ..."); t = new Thread(new Runnable() { @Override public void run() { try { - System.out.println("running TestObservable thread"); + System.out.println("running TestFlowable thread"); for (String s : values) { - System.out.println("TestObservable onNext: " + s); + System.out.println("TestFlowable onNext: " + s); subscriber.onNext(s); } throw new RuntimeException("Forced Failure"); @@ -209,9 +209,9 @@ public void run() { } }); - System.out.println("starting TestObservable thread"); + System.out.println("starting TestFlowable thread"); t.start(); - System.out.println("done starting TestObservable thread"); + System.out.println("done starting TestFlowable thread"); } } @@ -254,4 +254,19 @@ public void returnItem() { .assertResult(1); } + @Test + public void dispose() { + TestHelper.checkDisposed(Flowable.just(1).onErrorReturnItem(1)); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>() { + @Override + public Flowable apply(Flowable f) throws Exception { + return f.onErrorReturnItem(1); + } + }); + } + } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnExceptionResumeNextViaObservableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnExceptionResumeNextViaFlowableTest.java similarity index 99% rename from src/test/java/io/reactivex/internal/operators/flowable/FlowableOnExceptionResumeNextViaObservableTest.java rename to src/test/java/io/reactivex/internal/operators/flowable/FlowableOnExceptionResumeNextViaFlowableTest.java index cbe142275b..7b32e5eb3d 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnExceptionResumeNextViaObservableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnExceptionResumeNextViaFlowableTest.java @@ -28,7 +28,7 @@ import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; -public class FlowableOnExceptionResumeNextViaObservableTest { +public class FlowableOnExceptionResumeNextViaFlowableTest { @Test public void testResumeNextWithException() { diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRangeLongTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRangeLongTest.java index 1ca1fc06d4..3a423246bd 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRangeLongTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRangeLongTest.java @@ -24,7 +24,8 @@ import org.reactivestreams.Subscriber; import io.reactivex.*; -import io.reactivex.functions.Consumer; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.fuseable.QueueDisposable; import io.reactivex.subscribers.*; @@ -323,4 +324,216 @@ public void noOverflow() { Flowable.rangeLong(Long.MIN_VALUE, 2); Flowable.rangeLong(Long.MIN_VALUE, Long.MAX_VALUE); } + + @Test + public void conditionalNormal() { + Flowable.rangeLong(1L, 5L) + .filter(Functions.alwaysTrue()) + .test() + .assertResult(1L, 2L, 3L, 4L, 5L); + } + + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(Flowable.rangeLong(1L, 5L)); + + TestHelper.assertBadRequestReported(Flowable.rangeLong(1L, 5L).filter(Functions.alwaysTrue())); + } + + @Test + public void conditionalNormalSlowpath() { + Flowable.rangeLong(1L, 5L) + .filter(Functions.alwaysTrue()) + .test(5) + .assertResult(1L, 2L, 3L, 4L, 5L); + } + + @Test + public void conditionalSlowPathTakeExact() { + Flowable.rangeLong(1L, 5L) + .filter(Functions.alwaysTrue()) + .take(5) + .test() + .assertResult(1L, 2L, 3L, 4L, 5L); + } + + @Test + public void slowPathTakeExact() { + Flowable.rangeLong(1L, 5L) + .filter(Functions.alwaysTrue()) + .take(5) + .test() + .assertResult(1L, 2L, 3L, 4L, 5L); + } + + @Test + public void conditionalSlowPathRebatch() { + Flowable.rangeLong(1L, 5L) + .filter(Functions.alwaysTrue()) + .rebatchRequests(1) + .test() + .assertResult(1L, 2L, 3L, 4L, 5L); + } + + @Test + public void slowPathRebatch() { + Flowable.rangeLong(1L, 5L) + .filter(Functions.alwaysTrue()) + .rebatchRequests(1) + .test() + .assertResult(1L, 2L, 3L, 4L, 5L); + } + + @Test + public void slowPathCancel() { + TestSubscriber ts = new TestSubscriber(2L) { + @Override + public void onNext(Long t) { + super.onNext(t); + cancel(); + onComplete(); + } + }; + + Flowable.rangeLong(1L, 5L) + .subscribe(ts); + + ts.assertResult(1L); + } + + @Test + public void fastPathCancel() { + TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Long t) { + super.onNext(t); + cancel(); + onComplete(); + } + }; + + Flowable.rangeLong(1L, 5L) + .subscribe(ts); + + ts.assertResult(1L); + } + + @Test + public void conditionalSlowPathCancel() { + TestSubscriber ts = new TestSubscriber(1L) { + @Override + public void onNext(Long t) { + super.onNext(t); + cancel(); + onComplete(); + } + }; + + Flowable.rangeLong(1L, 5L) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + ts.assertResult(1L); + } + + @Test + public void conditionalFastPathCancel() { + TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Long t) { + super.onNext(t); + cancel(); + onComplete(); + } + }; + + Flowable.rangeLong(1L, 5L) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + ts.assertResult(1L); + } + + @Test + public void conditionalRequestOneByOne() { + TestSubscriber ts = new TestSubscriber(1L) { + @Override + public void onNext(Long t) { + super.onNext(t); + request(1); + } + }; + + Flowable.rangeLong(1L, 5L) + .filter(new Predicate() { + @Override + public boolean test(Long v) throws Exception { + return v % 2 == 0; + } + }) + .subscribe(ts); + + ts.assertResult(2L, 4L); + } + + @Test + public void conditionalRequestOneByOne2() { + TestSubscriber ts = new TestSubscriber(1L) { + @Override + public void onNext(Long t) { + super.onNext(t); + request(1); + } + }; + + Flowable.rangeLong(1L, 5L) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + ts.assertResult(1L, 2L, 3L, 4L, 5L); + } + + @Test + public void fastPathCancelExact() { + TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Long t) { + super.onNext(t); + if (t == 5L) { + cancel(); + onComplete(); + } + } + }; + + Flowable.rangeLong(1L, 5L) + .subscribe(ts); + + ts.assertResult(1L, 2L, 3L, 4L, 5L); + } + + @Test + public void conditionalFastPathCancelExact() { + TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Long t) { + super.onNext(t); + if (t == 5L) { + cancel(); + onComplete(); + } + } + }; + + Flowable.rangeLong(1L, 5L) + .filter(new Predicate() { + @Override + public boolean test(Long v) throws Exception { + return v % 2 == 0; + } + }) + .subscribe(ts); + + ts.assertResult(2L, 4L); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReduceTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReduceTest.java index 42cee9d7c6..76649a73db 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReduceTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReduceTest.java @@ -13,7 +13,7 @@ package io.reactivex.internal.operators.flowable; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @@ -25,6 +25,7 @@ import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.internal.fuseable.HasUpstreamPublisher; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subscribers.TestSubscriber; @@ -294,4 +295,93 @@ public void testBackpressureWithNoInitialValueObservable() throws InterruptedExc Integer r = reduced.blockingFirst(); assertEquals(21, r.intValue()); } + + @Test + public void source() { + Flowable source = Flowable.just(1); + + assertSame(source, (((HasUpstreamPublisher)source.reduce(sum))).source()); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Flowable.range(1, 2).reduce(sum)); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowableToMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Flowable f) throws Exception { + return f.reduce(sum); + } + }); + } + + @Test + public void error() { + Flowable.error(new TestException()) + .reduce(sum) + .test() + .assertFailure(TestException.class); + } + + @Test + public void errorFlowable() { + Flowable.error(new TestException()) + .reduce(sum) + .toFlowable() + .test() + .assertFailure(TestException.class); + } + + @Test + public void empty() { + Flowable.empty() + .reduce(sum) + .test() + .assertResult(); + } + + @Test + public void emptyFlowable() { + Flowable.empty() + .reduce(sum) + .toFlowable() + .test() + .assertResult(); + } + + @Test + public void badSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.reduce(sum); + } + }, false, 1, 1, 1); + } + + @Test + public void badSourceFlowable() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.reduce(sum).toFlowable(); + } + }, false, 1, 1, 1); + } + + @Test + public void reducerThrows() { + Flowable.just(1, 2) + .reduce(new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableScalarXMapTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableScalarXMapTest.java index 0a5b4c43fe..1eb6760cba 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableScalarXMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableScalarXMapTest.java @@ -238,11 +238,11 @@ public void run() { @Test public void cancelled() { ScalarSubscription scalar = new ScalarSubscription(new TestSubscriber(), 1); - + assertFalse(scalar.isCancelled()); - + scalar.cancel(); - + assertTrue(scalar.isCancelled()); } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java index 75f6bcbca6..00da635808 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java @@ -25,6 +25,7 @@ import org.reactivestreams.*; import io.reactivex.*; +import io.reactivex.Flowable; import io.reactivex.functions.*; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subscribers.DefaultSubscriber; @@ -725,4 +726,38 @@ public void singleElementOperatorDoNotSwallowExceptionWhenDone() { RxJavaPlugins.reset(); } } + + @Test + public void badSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable o) throws Exception { + return o.singleOrError(); + } + }, false, 1, 1, 1); + + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable o) throws Exception { + return o.singleElement(); + } + }, false, 1, 1, 1); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowableToSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Flowable o) throws Exception { + return o.singleOrError(); + } + }); + + TestHelper.checkDoubleOnSubscribeFlowableToMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Flowable o) throws Exception { + return o.singleElement(); + } + }); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSkipLastTimedTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSkipLastTimedTest.java index f238963b08..8f4a7c29b1 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSkipLastTimedTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSkipLastTimedTest.java @@ -227,4 +227,14 @@ public void errorDelayed() { .assertFailure(TestException.class); } + @Test + public void take() { + Flowable.just(1) + .skipLast(0, TimeUnit.SECONDS) + .take(1) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1); + } + } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java index 563fb47615..dd612cb3f1 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java @@ -22,7 +22,6 @@ import org.reactivestreams.*; import io.reactivex.*; -import io.reactivex.FlowableOperator; import io.reactivex.disposables.Disposable; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.schedulers.*; @@ -51,7 +50,7 @@ public void subscribe( latch.await(); } catch (InterruptedException e) { // this means we were unsubscribed (Scheduler shut down and interrupts) - // ... but we'll pretend we are like many Observables that ignore interrupts + // ... but we'll pretend we are like many Flowables that ignore interrupts } subscriber.onComplete(); @@ -281,4 +280,9 @@ public void cancelBeforeActualSubscribe() { .assertNotTerminated(); } + @Test + public void dispose() { + TestHelper.checkDisposed(Flowable.just(1).subscribeOn(Schedulers.single())); + } + } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeLastOneTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeLastOneTest.java index a622e97885..f3a8a31d76 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeLastOneTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeLastOneTest.java @@ -20,6 +20,7 @@ import org.junit.Test; +import io.reactivex.*; import io.reactivex.Flowable; import io.reactivex.functions.*; import io.reactivex.subscribers.*; @@ -137,4 +138,19 @@ public void onNext(T t) { } + @Test + public void dispose() { + TestHelper.checkDisposed(Flowable.just(1).takeLast(1)); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>() { + @Override + public Flowable apply(Flowable f) throws Exception { + return f.takeLast(1); + } + }); + } + } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithObservableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithFlowableTest.java similarity index 69% rename from src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithObservableTest.java rename to src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithFlowableTest.java index f1efbae669..35355de5cb 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithObservableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithFlowableTest.java @@ -14,24 +14,27 @@ package io.reactivex.internal.operators.flowable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.util.*; -import java.util.concurrent.Callable; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.reactivestreams.Subscriber; import io.reactivex.*; -import io.reactivex.exceptions.TestException; -import io.reactivex.processors.PublishProcessor; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; +import io.reactivex.processors.*; import io.reactivex.subscribers.*; -public class FlowableWindowWithObservableTest { +public class FlowableWindowWithFlowableTest { @Test - public void testWindowViaObservableNormal1() { + public void testWindowViaFlowableNormal1() { PublishProcessor source = PublishProcessor.create(); PublishProcessor boundary = PublishProcessor.create(); @@ -89,7 +92,7 @@ public void onComplete() { } @Test - public void testWindowViaObservableBoundaryCompletes() { + public void testWindowViaFlowableBoundaryCompletes() { PublishProcessor source = PublishProcessor.create(); PublishProcessor boundary = PublishProcessor.create(); @@ -145,7 +148,7 @@ public void onComplete() { } @Test - public void testWindowViaObservableBoundaryThrows() { + public void testWindowViaFlowableBoundaryThrows() { PublishProcessor source = PublishProcessor.create(); PublishProcessor boundary = PublishProcessor.create(); @@ -195,7 +198,7 @@ public void onComplete() { } @Test - public void testWindowViaObservableSourceThrows() { + public void testWindowViaFlowableSourceThrows() { PublishProcessor source = PublishProcessor.create(); PublishProcessor boundary = PublishProcessor.create(); @@ -280,7 +283,7 @@ public Flowable call() { } @Test - public void testWindowViaObservableNoUnsubscribe() { + public void testWindowViaFlowableNoUnsubscribe() { Flowable source = Flowable.range(1, 10); Callable> boundary = new Callable>() { @Override @@ -452,4 +455,164 @@ public Flowable call() { assertFalse(source.hasSubscribers()); assertFalse(boundary.hasSubscribers()); } + + @Test + public void boundaryDispose() { + TestHelper.checkDisposed(Flowable.never().window(Flowable.never())); + } + + @Test + public void boundaryDispose2() { + TestHelper.checkDisposed(Flowable.never().window(Functions.justCallable(Flowable.never()))); + } + + @Test + public void boundaryOnError() { + TestSubscriber to = Flowable.error(new TestException()) + .window(Flowable.never()) + .flatMap(Functions.>identity(), true) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + TestHelper.assertError(errors, 0, TestException.class); + } + + @Test + public void mainError() { + Flowable.error(new TestException()) + .window(Functions.justCallable(Flowable.never())) + .test() + .assertError(TestException.class); + } + + @Test + public void innerBadSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable o) throws Exception { + return Flowable.just(1).window(o).flatMap(new Function, Flowable>() { + @Override + public Flowable apply(Flowable v) throws Exception { + return v; + } + }); + } + }, false, 1, 1, (Object[])null); + + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable o) throws Exception { + return Flowable.just(1).window(Functions.justCallable(o)).flatMap(new Function, Flowable>() { + @Override + public Flowable apply(Flowable v) throws Exception { + return v; + } + }); + } + }, false, 1, 1, (Object[])null); + } + + @Test + public void reentrant() { + final FlowableProcessor ps = PublishProcessor.create(); + + TestSubscriber to = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + ps.onNext(2); + ps.onComplete(); + } + } + }; + + ps.window(BehaviorProcessor.createDefault(1)) + .flatMap(new Function, Flowable>() { + @Override + public Flowable apply(Flowable v) throws Exception { + return v; + } + }) + .subscribe(to); + + ps.onNext(1); + + to + .awaitDone(1, TimeUnit.SECONDS) + .assertResult(1, 2); + } + + @Test + public void reentrantCallable() { + final FlowableProcessor ps = PublishProcessor.create(); + + TestSubscriber to = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + ps.onNext(2); + ps.onComplete(); + } + } + }; + + ps.window(new Callable>() { + boolean once; + @Override + public Flowable call() throws Exception { + if (!once) { + once = true; + return BehaviorProcessor.createDefault(1); + } + return Flowable.never(); + } + }) + .flatMap(new Function, Flowable>() { + @Override + public Flowable apply(Flowable v) throws Exception { + return v; + } + }) + .subscribe(to); + + ps.onNext(1); + + to + .awaitDone(1, TimeUnit.SECONDS) + .assertResult(1, 2); + } + + @Test + public void badSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable o) throws Exception { + return o.window(Flowable.never()).flatMap(new Function, Flowable>() { + @Override + public Flowable apply(Flowable v) throws Exception { + return v; + } + }); + } + }, false, 1, 1, 1); + } + + @Test + public void badSourceCallable() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable o) throws Exception { + return o.window(Functions.justCallable(Flowable.never())).flatMap(new Function, Flowable>() { + @Override + public Flowable apply(Flowable v) throws Exception { + return v; + } + }); + } + }, false, 1, 1, 1); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndObservableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java similarity index 65% rename from src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndObservableTest.java rename to src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java index 070a3910c7..c5d98e0b42 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndObservableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java @@ -22,13 +22,15 @@ import org.reactivestreams.*; import io.reactivex.*; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; -import io.reactivex.processors.PublishProcessor; +import io.reactivex.processors.*; import io.reactivex.schedulers.TestScheduler; import io.reactivex.subscribers.*; -public class FlowableWindowWithStartEndObservableTest { +public class FlowableWindowWithStartEndFlowableTest { private TestScheduler scheduler; private Scheduler.Worker innerScheduler; @@ -40,7 +42,7 @@ public void before() { } @Test - public void testObservableBasedOpenerAndCloser() { + public void testFlowableBasedOpenerAndCloser() { final List list = new ArrayList(); final List> lists = new ArrayList>(); @@ -91,7 +93,7 @@ public void subscribe(Subscriber observer) { } @Test - public void testObservableBasedCloser() { + public void testFlowableBasedCloser() { final List list = new ArrayList(); final List> lists = new ArrayList>(); @@ -169,8 +171,8 @@ public void run() { private Consumer> observeWindow(final List list, final List> lists) { return new Consumer>() { @Override - public void accept(Flowable stringObservable) { - stringObservable.subscribe(new DefaultSubscriber() { + public void accept(Flowable stringFlowable) { + stringFlowable.subscribe(new DefaultSubscriber() { @Override public void onComplete() { lists.add(new ArrayList(list)); @@ -256,4 +258,135 @@ public Flowable apply(Integer t) { // FIXME subject has subscribers because of the open window assertTrue(close.hasSubscribers()); } + + @Test + public void dispose() { + TestHelper.checkDisposed(Flowable.just(1).window(Flowable.just(2), Functions.justFunction(Flowable.never()))); + } + + @Test + public void reentrant() { + final FlowableProcessor ps = PublishProcessor.create(); + + TestSubscriber to = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + ps.onNext(2); + ps.onComplete(); + } + } + }; + + ps.window(BehaviorProcessor.createDefault(1), Functions.justFunction(Flowable.never())) + .flatMap(new Function, Flowable>() { + @Override + public Flowable apply(Flowable v) throws Exception { + return v; + } + }) + .subscribe(to); + + ps.onNext(1); + + to + .awaitDone(1, TimeUnit.SECONDS) + .assertResult(1, 2); + } + + @Test + public void badSourceCallable() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable o) throws Exception { + return o.window(Flowable.just(1), Functions.justFunction(Flowable.never())); + } + }, false, 1, 1, (Object[])null); + } + + @Test + public void boundarySelectorNormal() { + PublishProcessor source = PublishProcessor.create(); + PublishProcessor start = PublishProcessor.create(); + final PublishProcessor end = PublishProcessor.create(); + + TestSubscriber to = source.window(start, new Function>() { + @Override + public Flowable apply(Integer v) throws Exception { + return end; + } + }) + .flatMap(Functions.>identity()) + .test(); + + start.onNext(0); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + source.onNext(4); + + start.onNext(1); + + source.onNext(5); + source.onNext(6); + + end.onNext(1); + + start.onNext(2); + + TestHelper.emit(source, 7, 8); + + to.assertResult(1, 2, 3, 4, 5, 5, 6, 6, 7, 8); + } + + @Test + public void startError() { + PublishProcessor source = PublishProcessor.create(); + PublishProcessor start = PublishProcessor.create(); + final PublishProcessor end = PublishProcessor.create(); + + TestSubscriber to = source.window(start, new Function>() { + @Override + public Flowable apply(Integer v) throws Exception { + return end; + } + }) + .flatMap(Functions.>identity()) + .test(); + + start.onError(new TestException()); + + to.assertFailure(TestException.class); + + assertFalse("Source has observers!", source.hasSubscribers()); + assertFalse("Start has observers!", start.hasSubscribers()); + assertFalse("End has observers!", end.hasSubscribers()); + } + + @Test + public void endError() { + PublishProcessor source = PublishProcessor.create(); + PublishProcessor start = PublishProcessor.create(); + final PublishProcessor end = PublishProcessor.create(); + + TestSubscriber to = source.window(start, new Function>() { + @Override + public Flowable apply(Integer v) throws Exception { + return end; + } + }) + .flatMap(Functions.>identity()) + .test(); + + start.onNext(1); + end.onError(new TestException()); + + to.assertFailure(TestException.class); + + assertFalse("Source has observers!", source.hasSubscribers()); + assertFalse("Start has observers!", start.hasSubscribers()); + assertFalse("End has observers!", end.hasSubscribers()); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java index 9b3df7ab4b..6957444d46 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java @@ -950,4 +950,37 @@ public ObservableSource apply(Integer v) throws Exception { to .assertFailure(TestException.class, 1); } + + @Test + public void fuseAndTake() { + UnicastSubject us = UnicastSubject.create(); + + us.onNext(1); + us.onComplete(); + + us.concatMapEager(new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + return Observable.just(1); + } + }) + .take(1) + .test() + .assertResult(1); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) throws Exception { + return o.concatMapEager(new Function>() { + @Override + public ObservableSource apply(Object v) throws Exception { + return Observable.just(v); + } + }); + } + }); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableDelaySubscriptionOtherTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableDelaySubscriptionOtherTest.java index e025813c58..0e63935315 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableDelaySubscriptionOtherTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableDelaySubscriptionOtherTest.java @@ -17,10 +17,10 @@ import org.junit.*; -import io.reactivex.Observable; +import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.TestException; -import io.reactivex.functions.Consumer; +import io.reactivex.functions.*; import io.reactivex.observers.TestObserver; import io.reactivex.subjects.PublishSubject; @@ -190,4 +190,14 @@ public void accept(Disposable d) { ts.assertNotComplete(); ts.assertError(TestException.class); } + + @Test + public void badSourceOther() { + TestHelper.checkBadSourceObservable(new Function, Object>() { + @Override + public Object apply(Observable o) throws Exception { + return Observable.just(1).delaySubscription(o); + } + }, false, 1, 1, 1); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableDoOnEachTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableDoOnEachTest.java index b1e7f5d0ea..e287294a73 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableDoOnEachTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableDoOnEachTest.java @@ -691,4 +691,19 @@ public void run() throws Exception { assertEquals(5, call[0]); assertEquals(1, call[1]); } + + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.just(1).doOnEach(new TestObserver())); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) throws Exception { + return o.doOnEach(new TestObserver()); + } + }); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybeTest.java index dfeae59605..2df9573a05 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybeTest.java @@ -295,13 +295,13 @@ public MaybeSource apply(Integer v) throws Exception { @Test public void innerSuccessCompletesAfterMain() { PublishSubject ps = PublishSubject.create(); - + TestObserver to = Observable.just(1).flatMapMaybe(Functions.justFunction(ps.singleElement())) .test(); - + ps.onNext(2); ps.onComplete(); - + to .assertResult(2); } @@ -331,7 +331,7 @@ protected void subscribeActual(Observer observer) { .flatMapMaybe(Functions.justFunction(Maybe.just(2))) .test() .assertFailureAndMessage(TestException.class, "First"); - + TestHelper.assertError(errors, 0, TestException.class, "Second"); } finally { RxJavaPlugins.reset(); @@ -353,7 +353,7 @@ protected void subscribeActual(MaybeObserver observer) { })) .test() .assertFailureAndMessage(TestException.class, "First"); - + TestHelper.assertError(errors, 0, TestException.class, "Second"); } finally { RxJavaPlugins.reset(); @@ -364,7 +364,7 @@ protected void subscribeActual(MaybeObserver observer) { public void emissionQueueTrigger() { final PublishSubject ps1 = PublishSubject.create(); final PublishSubject ps2 = PublishSubject.create(); - + TestObserver to = new TestObserver() { @Override public void onNext(Integer t) { @@ -375,7 +375,7 @@ public void onNext(Integer t) { } } }; - + Observable.just(ps1, ps2) .flatMapMaybe(new Function, MaybeSource>() { @Override @@ -396,7 +396,7 @@ public void emissionQueueTrigger2() { final PublishSubject ps1 = PublishSubject.create(); final PublishSubject ps2 = PublishSubject.create(); final PublishSubject ps3 = PublishSubject.create(); - + TestObserver to = new TestObserver() { @Override public void onNext(Integer t) { @@ -407,7 +407,7 @@ public void onNext(Integer t) { } } }; - + Observable.just(ps1, ps2, ps3) .flatMapMaybe(new Function, MaybeSource>() { @Override @@ -419,7 +419,7 @@ public MaybeSource apply(PublishSubject v) throws Exception { ps1.onNext(1); ps1.onComplete(); - + ps3.onComplete(); to.assertResult(1, 2); @@ -447,7 +447,7 @@ protected void subscribeActual(MaybeObserver observer) { } }) .subscribe(to); - + to .assertEmpty(); } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingleTest.java index c502f0539b..f1be448b12 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingleTest.java @@ -245,13 +245,13 @@ public SingleSource apply(Integer v) throws Exception { @Test public void innerSuccessCompletesAfterMain() { PublishSubject ps = PublishSubject.create(); - + TestObserver to = Observable.just(1).flatMapSingle(Functions.justFunction(ps.singleOrError())) .test(); - + ps.onNext(2); ps.onComplete(); - + to .assertResult(2); } @@ -281,7 +281,7 @@ protected void subscribeActual(Observer observer) { .flatMapSingle(Functions.justFunction(Single.just(2))) .test() .assertFailureAndMessage(TestException.class, "First"); - + TestHelper.assertError(errors, 0, TestException.class, "Second"); } finally { RxJavaPlugins.reset(); @@ -303,7 +303,7 @@ protected void subscribeActual(SingleObserver observer) { })) .test() .assertFailureAndMessage(TestException.class, "First"); - + TestHelper.assertError(errors, 0, TestException.class, "Second"); } finally { RxJavaPlugins.reset(); @@ -314,7 +314,7 @@ protected void subscribeActual(SingleObserver observer) { public void emissionQueueTrigger() { final PublishSubject ps1 = PublishSubject.create(); final PublishSubject ps2 = PublishSubject.create(); - + TestObserver to = new TestObserver() { @Override public void onNext(Integer t) { @@ -325,7 +325,7 @@ public void onNext(Integer t) { } } }; - + Observable.just(ps1, ps2) .flatMapSingle(new Function, SingleSource>() { @Override @@ -363,7 +363,7 @@ protected void subscribeActual(SingleObserver observer) { } }) .subscribe(to); - + to .assertEmpty(); } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFromTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFromTest.java index 592cf5eb68..23a69967bd 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFromTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFromTest.java @@ -21,7 +21,8 @@ import io.reactivex.*; import io.reactivex.functions.Function; -import io.reactivex.internal.fuseable.ScalarCallable; +import io.reactivex.internal.fuseable.*; +import io.reactivex.observers.*; import io.reactivex.schedulers.Schedulers; public class ObservableFromTest { @@ -73,4 +74,15 @@ public ObservableSource apply(Flowable f) throws Exception { } }); } + + @Test + public void fusionRejected() { + TestObserver to = ObserverFusion.newTest(QueueDisposable.ASYNC); + + Observable.fromArray(1, 2, 3) + .subscribe(to); + + ObserverFusion.assertFusion(to, QueueDisposable.NONE) + .assertResult(1, 2, 3); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableGroupByTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableGroupByTest.java index 203548b4e5..12def5da4f 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableGroupByTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableGroupByTest.java @@ -1532,4 +1532,13 @@ public void onNext(Integer t) { to.assertSubscribed().assertValue(1).assertNoErrors().assertNotComplete(); } + + @Test + public void delayErrorSimpleComplete() { + Observable.just(1) + .groupBy(Functions.justFunction(1), true) + .flatMap(Functions.>identity()) + .test() + .assertResult(1); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableMapTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableMapTest.java index f759700a63..ac263f2305 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableMapTest.java @@ -386,4 +386,14 @@ public void fusedReject() { ObserverFusion.assertFusion(to, QueueDisposable.NONE) .assertResult(1, 2, 3, 4, 5); } + + @Test + public void badSource() { + TestHelper.checkBadSourceObservable(new Function, Object>() { + @Override + public Object apply(Observable o) throws Exception { + return o.map(Functions.identity()); + } + }, false, 1, 1, 1); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRedoTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRedoTest.java index 506afe8677..e240aaa368 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRedoTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRedoTest.java @@ -25,7 +25,7 @@ public class ObservableRedoTest { @Test public void redoCancel() { final TestObserver to = new TestObserver(); - + Observable.just(1) .repeatWhen(new Function, ObservableSource>() { @Override diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableSingleTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableSingleTest.java index d3ddfed813..24d76ccc5e 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableSingleTest.java @@ -530,5 +530,29 @@ public Object apply(Observable o) throws Exception { return o.singleOrError(); } }, false, 1, 1, 1); + + TestHelper.checkBadSourceObservable(new Function, Object>() { + @Override + public Object apply(Observable o) throws Exception { + return o.singleElement(); + } + }, false, 1, 1, 1); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservableToSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Observable o) throws Exception { + return o.singleOrError(); + } + }); + + TestHelper.checkDoubleOnSubscribeObservableToMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Observable o) throws Exception { + return o.singleElement(); + } + }); } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableSkipLastTimedTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableSkipLastTimedTest.java index d9a92380fd..4e0973e020 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableSkipLastTimedTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableSkipLastTimedTest.java @@ -225,4 +225,14 @@ public void errorDelayed() { .test() .assertFailure(TestException.class); } + + @Test + public void take() { + Observable.just(1) + .skipLast(0, TimeUnit.SECONDS) + .take(1) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/queue/SimpleQueueTest.java b/src/test/java/io/reactivex/internal/queue/SimpleQueueTest.java index e2c253e785..c544e8b3d3 100644 --- a/src/test/java/io/reactivex/internal/queue/SimpleQueueTest.java +++ b/src/test/java/io/reactivex/internal/queue/SimpleQueueTest.java @@ -81,7 +81,7 @@ public void spscBiOfferCapacity() { assertTrue(q.offer(3, 4)); assertTrue(q.offer(5, 6)); assertTrue(q.offer(7)); - + assertFalse(q.offer(8, 9)); assertFalse(q.offer(9, 10)); } @@ -93,12 +93,12 @@ public void spscLinkedNewBufferPeek() { assertTrue(q.offer(3, 4)); assertTrue(q.offer(5, 6)); assertTrue(q.offer(7, 8)); // this should trigger a new buffer - + for (int i = 0; i < 8; i++) { assertEquals(i + 1, q.peek().intValue()); assertEquals(i + 1, q.poll().intValue()); } - + assertNull(q.peek()); assertNull(q.poll()); } @@ -106,11 +106,11 @@ public void spscLinkedNewBufferPeek() { @Test public void mpscOfferPollRace() throws Exception { final MpscLinkedQueue q = new MpscLinkedQueue(); - + final AtomicInteger c = new AtomicInteger(3); - + Thread t1 = new Thread(new Runnable() { - int i = 0; + int i; @Override public void run() { c.decrementAndGet(); @@ -126,7 +126,7 @@ public void run() { Thread t2 = new Thread(new Runnable() { int i = 10000; @Override - public void run() { + public void run() { c.decrementAndGet(); while (c.get() != 0) { } @@ -140,7 +140,7 @@ public void run() { Runnable r3 = new Runnable() { int i = 20000; @Override - public void run() { + public void run() { c.decrementAndGet(); while (c.get() != 0) { } @@ -151,7 +151,7 @@ public void run() { }; r3.run(); - + t1.join(); t2.join(); } diff --git a/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java b/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java index 2985266fc0..528d6e82ce 100644 --- a/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java +++ b/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java @@ -332,24 +332,24 @@ public void dispose() { @Override public void run() { calls[0]++; } }); - + TestHelper.checkDisposed(us); assertEquals(1, calls[0]); - + List errors = TestHelper.trackPluginErrors(); try { us.onError(new TestException()); - + TestHelper.assertError(errors, 0, TestException.class); } finally { RxJavaPlugins.reset(); } - + Disposable d = Disposables.empty(); - + us.onSubscribe(d); - + assertTrue(d.isDisposed()); } }