diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index d26ef6faef..f676ab4786 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -986,12 +986,12 @@ public final Throwable blockingGet(long timeout, TimeUnit unit) { *
Scheduler:
*
{@code cache} does not operate by default on a particular {@link Scheduler}.
* + *

History: 2.0.4 - experimental * @return the new Completable instance - * @since 2.0.4 - experimental + * @since 2.1 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) - @Experimental public final Completable cache() { return RxJavaPlugins.onAssembly(new CompletableCache(this)); } @@ -1262,13 +1262,13 @@ public final Completable doAfterTerminate(final Action onAfterTerminate) { *

Scheduler:
*
{@code doFinally} does not operate by default on a particular {@link Scheduler}.
* + *

History: 2.0.1 - experimental * @param onFinally the action called when this Completable terminates or gets cancelled * @return the new Completable instance - * @since 2.0.1 - experimental + * @since 2.1 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) - @Experimental public final Completable doFinally(Action onFinally) { ObjectHelper.requireNonNull(onFinally, "onFinally is null"); return RxJavaPlugins.onAssembly(new CompletableDoFinally(this, onFinally)); @@ -1598,10 +1598,10 @@ public final Flowable startWith(Publisher other) { *

Scheduler:
*
{@code hide} does not operate by default on a particular {@link Scheduler}.
* + *

History: 2.0.5 - experimental * @return the new Completable instance - * @since 2.0.5 - experimental + * @since 2.1 */ - @Experimental @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Completable hide() { diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 60e4b64d4a..037c111832 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -7648,14 +7648,14 @@ public final Flowable distinctUntilChanged(BiPredicate *

This operator supports normal and conditional Subscribers as well as boundary-limited * synchronous or asynchronous queue-fusion.
* + *

History: 2.0.1 - experimental * @param onFinally the action called when this Flowable terminates or gets cancelled * @return the new Flowable instance - * @since 2.0.1 - experimental + * @since 2.1 */ @CheckReturnValue @BackpressureSupport(BackpressureKind.PASS_THROUGH) @SchedulerSupport(SchedulerSupport.NONE) - @Experimental public final Flowable doFinally(Action onFinally) { ObjectHelper.requireNonNull(onFinally, "onFinally is null"); return RxJavaPlugins.onAssembly(new FlowableDoFinally(this, onFinally)); @@ -7675,14 +7675,14 @@ public final Flowable doFinally(Action onFinally) { *

This operator supports normal and conditional Subscribers as well as boundary-limited * synchronous or asynchronous queue-fusion.
* + *

History: 2.0.1 - experimental * @param onAfterNext the Consumer that will be called after emitting an item from upstream to the downstream * @return the new Flowable instance - * @since 2.0.1 - experimental + * @since 2.1 */ @CheckReturnValue @BackpressureSupport(BackpressureKind.PASS_THROUGH) @SchedulerSupport(SchedulerSupport.NONE) - @Experimental public final Flowable doAfterNext(Consumer onAfterNext) { ObjectHelper.requireNonNull(onAfterNext, "onAfterNext is null"); return RxJavaPlugins.onAssembly(new FlowableDoAfterNext(this, onAfterNext)); @@ -10447,13 +10447,14 @@ public final Flowable onTerminateDetach() { *

Scheduler:
*
{@code parallel} does not operate by default on a particular {@link Scheduler}.
* + *

History: 2.0.5 - experimental * @return the new ParallelFlowable instance - * @since 2.0.5 - experimental + * @since 2.1 - beta */ @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) @CheckReturnValue - @Experimental + @Beta public final ParallelFlowable parallel() { return ParallelFlowable.from(this); } @@ -10476,14 +10477,15 @@ public final ParallelFlowable parallel() { *

Scheduler:
*
{@code parallel} does not operate by default on a particular {@link Scheduler}.
* + *

History: 2.0.5 - experimental * @param parallelism the number of 'rails' to use * @return the new ParallelFlowable instance - * @since 2.0.5 - experimental + * @since 2.1 - beta */ @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) @CheckReturnValue - @Experimental + @Beta public final ParallelFlowable parallel(int parallelism) { ObjectHelper.verifyPositive(parallelism, "parallelism"); return ParallelFlowable.from(this, parallelism); @@ -10508,15 +10510,16 @@ public final ParallelFlowable parallel(int parallelism) { *

Scheduler:
*
{@code parallel} does not operate by default on a particular {@link Scheduler}.
* + *

History: 2.0.5 - experimental * @param parallelism the number of 'rails' to use * @param prefetch the number of items each 'rail' should prefetch * @return the new ParallelFlowable instance - * @since 2.0.5 - experimental + * @since 2.1 - beta */ @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) @CheckReturnValue - @Experimental + @Beta public final ParallelFlowable parallel(int parallelism, int prefetch) { ObjectHelper.verifyPositive(parallelism, "parallelism"); ObjectHelper.verifyPositive(prefetch, "prefetch"); @@ -11787,6 +11790,7 @@ public final Flowable sample(long period, TimeUnit unit) { *

{@code sample} operates by default on the {@code computation} {@link Scheduler}.
* * + *

History: 2.0.5 - experimental * @param period * the sampling rate * @param unit @@ -11800,12 +11804,11 @@ public final Flowable sample(long period, TimeUnit unit) { * @see ReactiveX operators documentation: Sample * @see RxJava wiki: Backpressure * @see #throttleLast(long, TimeUnit) - * @since 2.0.5 - experimental + * @since 2.1 */ @CheckReturnValue @BackpressureSupport(BackpressureKind.ERROR) @SchedulerSupport(SchedulerSupport.COMPUTATION) - @Experimental public final Flowable sample(long period, TimeUnit unit, boolean emitLast) { return sample(period, unit, Schedulers.computation(), emitLast); } @@ -11856,6 +11859,7 @@ public final Flowable sample(long period, TimeUnit unit, Scheduler scheduler) *

You specify which {@link Scheduler} this operator will use
* * + *

History: 2.0.5 - experimental * @param period * the sampling rate * @param unit @@ -11871,12 +11875,11 @@ public final Flowable sample(long period, TimeUnit unit, Scheduler scheduler) * @see ReactiveX operators documentation: Sample * @see RxJava wiki: Backpressure * @see #throttleLast(long, TimeUnit, Scheduler) - * @since 2.0.5 - experimental + * @since 2.1 */ @CheckReturnValue @BackpressureSupport(BackpressureKind.ERROR) @SchedulerSupport(SchedulerSupport.CUSTOM) - @Experimental public final Flowable sample(long period, TimeUnit unit, Scheduler scheduler, boolean emitLast) { ObjectHelper.requireNonNull(unit, "unit is null"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); @@ -11928,6 +11931,7 @@ public final Flowable sample(Publisher sampler) { *

This version of {@code sample} does not operate by default on a particular {@link Scheduler}.
* * + *

History: 2.0.5 - experimental * @param the element type of the sampler Publisher * @param sampler * the Publisher to use for sampling the source Publisher @@ -11939,12 +11943,11 @@ public final Flowable sample(Publisher sampler) { * the {@code sampler} Publisher emits an item or completes * @see ReactiveX operators documentation: Sample * @see RxJava wiki: Backpressure - * @since 2.0.5 - experimental + * @since 2.1 */ @CheckReturnValue @BackpressureSupport(BackpressureKind.ERROR) @SchedulerSupport(SchedulerSupport.NONE) - @Experimental public final Flowable sample(Publisher sampler, boolean emitLast) { ObjectHelper.requireNonNull(sampler, "sampler is null"); return RxJavaPlugins.onAssembly(new FlowableSamplePublisher(this, sampler, emitLast)); @@ -12755,37 +12758,6 @@ public final Flowable startWithArray(T... items) { return concatArray(fromArray, this); } - /** - * Ensures that the event flow between the upstream and downstream follow - * the Reactive-Streams 1.0 specification by honoring the 3 additional rules - * (which are omitted in standard operators due to performance reasons). - *

- * In addition, if rule §2.12 (onSubscribe must be called at most once) is violated, - * the sequence is cancelled an onError(IllegalStateException) is emitted. - *
- *
Backpressure:
- *
The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure - * behavior.
- *
Scheduler:
- *
{@code strict} does not operate by default on a particular {@link Scheduler}.
- *
- * @return the new Flowable instance - * @since 2.0.5 - experimental - * @deprecated 2.0.7, will be removed in 2.1.0; by default, the Publisher interface is always strict - */ - @BackpressureSupport(BackpressureKind.PASS_THROUGH) - @SchedulerSupport(SchedulerSupport.NONE) - @Experimental - @CheckReturnValue - @Deprecated - public final Flowable strict() { - return this; - } - /** * Subscribes to a Publisher and ignores {@code onNext} and {@code onComplete} emissions. *

@@ -12997,12 +12969,13 @@ public final void subscribe(Subscriber s) { *

Scheduler:
*
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
* + *

History: 2.0.7 - experimental * @param s the FlowableSubscriber that will consume signals from this Flowable - * @since 2.0.7 - experimental + * @since 2.1 - beta */ @BackpressureSupport(BackpressureKind.SPECIAL) @SchedulerSupport(SchedulerSupport.NONE) - @Experimental + @Beta public final void subscribe(FlowableSubscriber s) { ObjectHelper.requireNonNull(s, "s is null"); try { diff --git a/src/main/java/io/reactivex/FlowableSubscriber.java b/src/main/java/io/reactivex/FlowableSubscriber.java index 8076c5c143..e483064610 100644 --- a/src/main/java/io/reactivex/FlowableSubscriber.java +++ b/src/main/java/io/reactivex/FlowableSubscriber.java @@ -20,10 +20,11 @@ * Represents a Reactive-Streams inspired Subscriber that is RxJava 2 only * and weakens rules §1.3 and §3.9 of the specification for gaining performance. * + *

History: 2.0.7 - experimental * @param the value type - * @since 2.0.7 - experimental + * @since 2.1 - beta */ -@Experimental +@Beta public interface FlowableSubscriber extends Subscriber { /** diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index dd79da3bf1..4385c420ac 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -2375,13 +2375,13 @@ public final Maybe delaySubscription(long delay, TimeUnit unit, Scheduler sch *

Scheduler:
*
{@code doAfterSuccess} does not operate by default on a particular {@link Scheduler}.
* + *

History: 2.0.1 - experimental * @param onAfterSuccess the Consumer that will be called after emitting an item from upstream to the downstream * @return the new Maybe instance - * @since 2.0.1 - experimental + * @since 2.1 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) - @Experimental public final Maybe doAfterSuccess(Consumer onAfterSuccess) { ObjectHelper.requireNonNull(onAfterSuccess, "doAfterSuccess is null"); return RxJavaPlugins.onAssembly(new MaybeDoAfterSuccess(this, onAfterSuccess)); @@ -2428,13 +2428,13 @@ public final Maybe doAfterTerminate(Action onAfterTerminate) { *

Scheduler:
*
{@code doFinally} does not operate by default on a particular {@link Scheduler}.
* + *

History: 2.0.1 - experimental * @param onFinally the action called when this Maybe terminates or gets cancelled * @return the new Maybe instance - * @since 2.0.1 - experimental + * @since 2.1 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) - @Experimental public final Maybe doFinally(Action onFinally) { ObjectHelper.requireNonNull(onFinally, "onFinally is null"); return RxJavaPlugins.onAssembly(new MaybeDoFinally(this, onFinally)); @@ -2829,17 +2829,17 @@ public final Single flatMapSingle(final Function{@code flatMapSingleElement} does not operate by default on a particular {@link Scheduler}. * * + *

History: 2.0.2 - experimental * @param the result value type * @param mapper * a function that, when applied to the item emitted by the source Maybe, returns a * Single * @return the new Maybe instance * @see ReactiveX operators documentation: FlatMap - * @since 2.0.2 - experimental + * @since 2.1 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) - @Experimental public final Maybe flatMapSingleElement(final Function> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new MaybeFlatMapSingleElement(this, mapper)); diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 8b4c33988c..4ac2e5774a 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -6825,13 +6825,13 @@ public final Observable distinctUntilChanged(BiPredicateOperator-fusion: *

This operator supports boundary-limited synchronous or asynchronous queue-fusion.
* + *

History: 2.0.1 - experimental * @param onAfterNext the Consumer that will be called after emitting an item from upstream to the downstream * @return the new Observable instance - * @since 2.0.1 - experimental + * @since 2.1 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) - @Experimental public final Observable doAfterNext(Consumer onAfterNext) { ObjectHelper.requireNonNull(onAfterNext, "onAfterNext is null"); return RxJavaPlugins.onAssembly(new ObservableDoAfterNext(this, onAfterNext)); @@ -6874,13 +6874,13 @@ public final Observable doAfterTerminate(Action onFinally) { * Operator-fusion: *

This operator supports boundary-limited synchronous or asynchronous queue-fusion.
* + *

History: 2.0.1 - experimental * @param onFinally the action called when this Observable terminates or gets cancelled * @return the new Observable instance - * @since 2.0.1 - experimental + * @since 2.1 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) - @Experimental public final Observable doFinally(Action onFinally) { ObjectHelper.requireNonNull(onFinally, "onFinally is null"); return RxJavaPlugins.onAssembly(new ObservableDoFinally(this, onFinally)); @@ -9845,6 +9845,7 @@ public final Observable sample(long period, TimeUnit unit) { *

{@code sample} operates by default on the {@code computation} {@link Scheduler}.
* * + *

History: 2.0.5 - experimental * @param period * the sampling rate * @param unit @@ -9857,11 +9858,10 @@ public final Observable sample(long period, TimeUnit unit) { * if false, an unsampled last item is ignored. * @see ReactiveX operators documentation: Sample * @see #throttleLast(long, TimeUnit) - * @since 2.0.5 - experimental + * @since 2.1 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.COMPUTATION) - @Experimental public final Observable sample(long period, TimeUnit unit, boolean emitLast) { return sample(period, unit, Schedulers.computation(), emitLast); } @@ -9906,6 +9906,7 @@ public final Observable sample(long period, TimeUnit unit, Scheduler schedule *

You specify which {@link Scheduler} this operator will use
* * + *

History: 2.0.5 - experimental * @param period * the sampling rate * @param unit @@ -9920,11 +9921,10 @@ public final Observable sample(long period, TimeUnit unit, Scheduler schedule * the specified time interval * @see ReactiveX operators documentation: Sample * @see #throttleLast(long, TimeUnit, Scheduler) - * @since 2.0.5 - experimental + * @since 2.1 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) - @Experimental public final Observable sample(long period, TimeUnit unit, Scheduler scheduler, boolean emitLast) { ObjectHelper.requireNonNull(unit, "unit is null"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); @@ -9968,6 +9968,7 @@ public final Observable sample(ObservableSource sampler) { *

This version of {@code sample} does not operate by default on a particular {@link Scheduler}.
* * + *

History: 2.0.5 - experimental * @param the element type of the sampler ObservableSource * @param sampler * the ObservableSource to use for sampling the source ObservableSource @@ -9978,11 +9979,10 @@ public final Observable sample(ObservableSource sampler) { * @return an Observable that emits the results of sampling the items emitted by this ObservableSource whenever * the {@code sampler} ObservableSource emits an item or completes * @see ReactiveX operators documentation: Sample - * @since 2.0.5 - experimental + * @since 2.1 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) - @Experimental public final Observable sample(ObservableSource sampler, boolean emitLast) { ObjectHelper.requireNonNull(sampler, "sampler is null"); return RxJavaPlugins.onAssembly(new ObservableSampleWithObservable(this, sampler, emitLast)); diff --git a/src/main/java/io/reactivex/Scheduler.java b/src/main/java/io/reactivex/Scheduler.java index b0d6ce1528..00eb314006 100644 --- a/src/main/java/io/reactivex/Scheduler.java +++ b/src/main/java/io/reactivex/Scheduler.java @@ -239,13 +239,14 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial * }); * * + *

History: 2.0.1 - experimental * @param a Scheduler and a Subscription * @param combine the function that takes a two-level nested Flowable sequence of a Completable and returns * the Completable that will be subscribed to and should trigger the execution of the scheduled Actions. * @return the Scheduler with the customized execution behavior + * @since 2.1 */ @SuppressWarnings("unchecked") - @Experimental @NonNull public S when(@NonNull Function>, Completable> combine) { return (S) new SchedulerWhen(combine, this); diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 4d2469f38e..543490c8b7 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -1793,13 +1793,13 @@ public final Single delaySubscription(long time, TimeUnit unit, Scheduler *

Scheduler:
*
{@code doAfterSuccess} does not operate by default on a particular {@link Scheduler}.
* + *

History: 2.0.1 - experimental * @param onAfterSuccess the Consumer that will be called after emitting an item from upstream to the downstream * @return the new Single instance - * @since 2.0.1 - experimental + * @since 2.1 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) - @Experimental public final Single doAfterSuccess(Consumer onAfterSuccess) { ObjectHelper.requireNonNull(onAfterSuccess, "doAfterSuccess is null"); return RxJavaPlugins.onAssembly(new SingleDoAfterSuccess(this, onAfterSuccess)); @@ -1816,16 +1816,16 @@ public final Single doAfterSuccess(Consumer onAfterSuccess) { *

{@code doAfterTerminate} does not operate by default on a particular {@link Scheduler}.
* * + *

History: 2.0.6 - experimental * @param onAfterTerminate * an {@link Action} to be invoked when the source Single finishes * @return a Single that emits the same items as the source Single, then invokes the * {@link Action} * @see ReactiveX operators documentation: Do - * @since 2.0.6 - experimental + * @since 2.1 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) - @Experimental public final Single doAfterTerminate(Action onAfterTerminate) { ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null"); return RxJavaPlugins.onAssembly(new SingleDoAfterTerminate(this, onAfterTerminate)); @@ -1842,13 +1842,13 @@ public final Single doAfterTerminate(Action onAfterTerminate) { *

Scheduler:
*
{@code doFinally} does not operate by default on a particular {@link Scheduler}.
* + *

History: 2.0.1 - experimental * @param onFinally the action called when this Single terminates or gets cancelled * @return the new Single instance - * @since 2.0.1 - experimental + * @since 2.1 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) - @Experimental public final Single doFinally(Action onFinally) { ObjectHelper.requireNonNull(onFinally, "onFinally is null"); return RxJavaPlugins.onAssembly(new SingleDoFinally(this, onFinally)); diff --git a/src/main/java/io/reactivex/annotations/BackpressureKind.java b/src/main/java/io/reactivex/annotations/BackpressureKind.java index 7e37de872d..6f04d811f3 100644 --- a/src/main/java/io/reactivex/annotations/BackpressureKind.java +++ b/src/main/java/io/reactivex/annotations/BackpressureKind.java @@ -15,6 +15,7 @@ /** * Enumeration for various kinds of backpressure support. + * @since 2.0 */ public enum BackpressureKind { /** diff --git a/src/main/java/io/reactivex/annotations/BackpressureSupport.java b/src/main/java/io/reactivex/annotations/BackpressureSupport.java index 9d346d350f..17b3351656 100644 --- a/src/main/java/io/reactivex/annotations/BackpressureSupport.java +++ b/src/main/java/io/reactivex/annotations/BackpressureSupport.java @@ -17,6 +17,7 @@ /** * Indicates the backpressure support kind of the associated operator or class. + * @since 2.0 */ @Retention(RetentionPolicy.RUNTIME) @Documented diff --git a/src/main/java/io/reactivex/annotations/CheckReturnValue.java b/src/main/java/io/reactivex/annotations/CheckReturnValue.java index 3446ca517c..005f079693 100644 --- a/src/main/java/io/reactivex/annotations/CheckReturnValue.java +++ b/src/main/java/io/reactivex/annotations/CheckReturnValue.java @@ -21,13 +21,12 @@ /** * Marks methods whose return values should be checked. - * - * @since 2.0.2 - experimental + *

History: 2.0.2 - experimental + * @since 2.1 */ @Retention(RetentionPolicy.RUNTIME) @Documented @Target(ElementType.METHOD) -@Experimental public @interface CheckReturnValue { } diff --git a/src/main/java/io/reactivex/annotations/SchedulerSupport.java b/src/main/java/io/reactivex/annotations/SchedulerSupport.java index acf89ad988..f2c7be4d68 100644 --- a/src/main/java/io/reactivex/annotations/SchedulerSupport.java +++ b/src/main/java/io/reactivex/annotations/SchedulerSupport.java @@ -24,6 +24,7 @@ * {@linkplain #NONE not using a scheduler} and {@linkplain #CUSTOM a manually-specified scheduler}. * Libraries providing their own values should namespace them with their base package name followed * by a colon ({@code :}) and then a human-readable name (e.g., {@code com.example:ui-thread}). + * @since 2.0 */ @Retention(RetentionPolicy.RUNTIME) @Documented diff --git a/src/main/java/io/reactivex/exceptions/OnErrorNotImplementedException.java b/src/main/java/io/reactivex/exceptions/OnErrorNotImplementedException.java index 13488f79b4..e0e4d8e336 100644 --- a/src/main/java/io/reactivex/exceptions/OnErrorNotImplementedException.java +++ b/src/main/java/io/reactivex/exceptions/OnErrorNotImplementedException.java @@ -19,9 +19,10 @@ * Represents an exception used to signal to the {@code RxJavaPlugins.onError()} that a * callback-based subscribe() method on a base reactive type didn't specify * an onError handler. - * @since 2.0.6 - experimental + *

History: 2.0.6 - experimental + * @since 2.1 - beta */ -@Experimental +@Beta public final class OnErrorNotImplementedException extends RuntimeException { private static final long serialVersionUID = -6298857009889503852L; diff --git a/src/main/java/io/reactivex/exceptions/ProtocolViolationException.java b/src/main/java/io/reactivex/exceptions/ProtocolViolationException.java index e5dda3c7df..00a3ee3bbc 100644 --- a/src/main/java/io/reactivex/exceptions/ProtocolViolationException.java +++ b/src/main/java/io/reactivex/exceptions/ProtocolViolationException.java @@ -13,14 +13,15 @@ package io.reactivex.exceptions; -import io.reactivex.annotations.Experimental; +import io.reactivex.annotations.Beta; /** * Explicitly named exception to indicate a Reactive-Streams * protocol violation. - * @since 2.0.6 - experimental + *

History: 2.0.6 - experimental + * @since 2.1 - beta */ -@Experimental +@Beta public final class ProtocolViolationException extends IllegalStateException { private static final long serialVersionUID = 1644750035281290266L; diff --git a/src/main/java/io/reactivex/exceptions/UndeliverableException.java b/src/main/java/io/reactivex/exceptions/UndeliverableException.java index 23bcd76743..030ba755ba 100644 --- a/src/main/java/io/reactivex/exceptions/UndeliverableException.java +++ b/src/main/java/io/reactivex/exceptions/UndeliverableException.java @@ -13,13 +13,14 @@ package io.reactivex.exceptions; -import io.reactivex.annotations.Experimental; +import io.reactivex.annotations.Beta; /** * Wrapper for Throwable errors that are sent to `RxJavaPlugins.onError`. - * @since 2.0.6 - experimental + *

History: 2.0.6 - experimental + * @since 2.1 - beta */ -@Experimental +@Beta public final class UndeliverableException extends IllegalStateException { private static final long serialVersionUID = 1644750035281290266L; diff --git a/src/main/java/io/reactivex/observers/BaseTestConsumer.java b/src/main/java/io/reactivex/observers/BaseTestConsumer.java index 85c6492ea3..e1ee36d7b1 100644 --- a/src/main/java/io/reactivex/observers/BaseTestConsumer.java +++ b/src/main/java/io/reactivex/observers/BaseTestConsumer.java @@ -17,7 +17,6 @@ import java.util.concurrent.*; import io.reactivex.Notification; -import io.reactivex.annotations.Experimental; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.CompositeException; import io.reactivex.functions.Predicate; @@ -338,11 +337,11 @@ public final U assertValue(T value) { * Assert that this TestObserver/TestSubscriber did not receive an onNext value which is equal to * the given value with respect to Objects.equals. * - * @since 2.0.5 - experimental + *

History: 2.0.5 - experimental * @param value the value to expect not being received - * @return this; + * @return this + * @since 2.1 */ - @Experimental @SuppressWarnings("unchecked") public final U assertNever(T value) { int s = values.size(); @@ -379,12 +378,12 @@ public final U assertValue(Predicate valuePredicate) { * Asserts that this TestObserver/TestSubscriber did not receive any onNext value for which * the provided predicate returns true. * - * @since 2.0.5 - experimental + *

History: 2.0.5 - experimental * @param valuePredicate the predicate that receives the onNext value * and should return true for the expected value. * @return this + * @since 2.1 */ - @Experimental @SuppressWarnings("unchecked") public final U assertNever(Predicate valuePredicate) { int s = values.size(); @@ -782,12 +781,12 @@ public final U assertEmpty() { /** * Set the tag displayed along with an assertion failure's * other state information. + *

History: 2.0.7 - experimental * @param tag the string to display (null won't print any tag) * @return this - * @since 2.0.7 - experimental + * @since 2.1 */ @SuppressWarnings("unchecked") - @Experimental public final U withTag(CharSequence tag) { this.tag = tag; return (U)this; @@ -796,9 +795,9 @@ public final U withTag(CharSequence tag) { /** * Enumeration of default wait strategies when waiting for a specific number of * items in {@link BaseTestConsumer#awaitCount(int, Runnable)}. - * @since 2.0.7 - experimental + *

History: 2.0.7 - experimental + * @since 2.1 */ - @Experimental public enum TestWaitStrategy implements Runnable { /** The wait loop will spin as fast as possible. */ SPIN { @@ -861,12 +860,12 @@ static void sleep(int millis) { * Await until the TestObserver/TestSubscriber receives the given * number of items or terminates by sleeping 10 milliseconds at a time * up to 5000 milliseconds of timeout. + *

History: 2.0.7 - experimental * @param atLeast the number of items expected at least * @return this * @see #awaitCount(int, Runnable, long) - * @since 2.0.7 - experimental + * @since 2.1 */ - @Experimental public final U awaitCount(int atLeast) { return awaitCount(atLeast, TestWaitStrategy.SLEEP_10MS, 5000); } @@ -875,6 +874,7 @@ public final U awaitCount(int atLeast) { * Await until the TestObserver/TestSubscriber receives the given * number of items or terminates by waiting according to the wait * strategy and up to 5000 milliseconds of timeout. + *

History: 2.0.7 - experimental * @param atLeast the number of items expected at least * @param waitStrategy a Runnable called when the current received count * hasn't reached the expected value and there was @@ -882,9 +882,8 @@ public final U awaitCount(int atLeast) { * for examples * @return this * @see #awaitCount(int, Runnable, long) - * @since 2.0.7 - experimental + * @since 2.1 */ - @Experimental public final U awaitCount(int atLeast, Runnable waitStrategy) { return awaitCount(atLeast, waitStrategy, 5000); } @@ -892,6 +891,7 @@ public final U awaitCount(int atLeast, Runnable waitStrategy) { /** * Await until the TestObserver/TestSubscriber receives the given * number of items or terminates. + *

History: 2.0.7 - experimental * @param atLeast the number of items expected at least * @param waitStrategy a Runnable called when the current received count * hasn't reached the expected value and there was @@ -900,10 +900,9 @@ public final U awaitCount(int atLeast, Runnable waitStrategy) { * @param timeoutMillis if positive, the await ends if the specified amount of * time has passed no matter how many items were received * @return this - * @since 2.0.7 - experimental + * @since 2.1 */ @SuppressWarnings("unchecked") - @Experimental public final U awaitCount(int atLeast, Runnable waitStrategy, long timeoutMillis) { long start = System.currentTimeMillis(); for (;;) { @@ -925,24 +924,24 @@ public final U awaitCount(int atLeast, Runnable waitStrategy, long timeoutMillis /** * @return true if one of the timeout-based await methods has timed out. + *

History: 2.0.7 - experimental * @see #clearTimeout() * @see #assertTimeout() * @see #assertNoTimeout() - * @since 2.0.7 - experimental + * @since 2.1 */ - @Experimental public final boolean isTimeout() { return timeout; } /** * Clears the timeout flag set by the await methods when they timed out. + *

History: 2.0.7 - experimental * @return this - * @since 2.0.7 - experimental + * @since 2.1 * @see #isTimeout() */ @SuppressWarnings("unchecked") - @Experimental public final U clearTimeout() { timeout = false; return (U)this; @@ -950,11 +949,11 @@ public final U clearTimeout() { /** * Asserts that some awaitX method has timed out. + *

History: 2.0.7 - experimental * @return this - * @since 2.0.7 - experimental + * @since 2.1 */ @SuppressWarnings("unchecked") - @Experimental public final U assertTimeout() { if (!timeout) { throw fail("No timeout?!"); @@ -965,11 +964,11 @@ public final U assertTimeout() { /** * Asserts that some awaitX method has not timed out. + *

History: 2.0.7 - experimental * @return this - * @since 2.0.7 - experimental + * @since 2.1 */ @SuppressWarnings("unchecked") - @Experimental public final U assertNoTimeout() { if (timeout) { throw fail("Timeout?!"); diff --git a/src/main/java/io/reactivex/parallel/ParallelFlowable.java b/src/main/java/io/reactivex/parallel/ParallelFlowable.java index e42e2c78ad..b1f6d60322 100644 --- a/src/main/java/io/reactivex/parallel/ParallelFlowable.java +++ b/src/main/java/io/reactivex/parallel/ParallelFlowable.java @@ -34,10 +34,11 @@ * Use {@code runOn()} to introduce where each 'rail' should run on thread-vise. * Use {@code sequential()} to merge the sources back into a single Flowable. * + *

History: 2.0.5 - experimental * @param the value type - * @since 2.0.5 - experimental + * @since 2.1 - beta */ -@Experimental +@Beta public abstract class ParallelFlowable { /** diff --git a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java index 61c7bc09d9..5ab0192342 100644 --- a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java +++ b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java @@ -12,39 +12,22 @@ */ package io.reactivex.plugins; -import io.reactivex.Completable; -import io.reactivex.CompletableObserver; -import io.reactivex.Flowable; -import io.reactivex.Maybe; -import io.reactivex.MaybeObserver; -import io.reactivex.Observable; -import io.reactivex.Observer; -import io.reactivex.Scheduler; -import io.reactivex.Single; -import io.reactivex.SingleObserver; -import io.reactivex.annotations.Experimental; -import io.reactivex.annotations.NonNull; -import io.reactivex.annotations.Nullable; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.*; + +import org.reactivestreams.Subscriber; + +import io.reactivex.*; +import io.reactivex.annotations.*; import io.reactivex.exceptions.*; import io.reactivex.flowables.ConnectableFlowable; -import io.reactivex.functions.BiFunction; -import io.reactivex.functions.BooleanSupplier; -import io.reactivex.functions.Consumer; -import io.reactivex.functions.Function; +import io.reactivex.functions.*; import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.schedulers.ComputationScheduler; -import io.reactivex.internal.schedulers.IoScheduler; -import io.reactivex.internal.schedulers.NewThreadScheduler; -import io.reactivex.internal.schedulers.SingleScheduler; +import io.reactivex.internal.schedulers.*; import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.observables.ConnectableObservable; import io.reactivex.parallel.ParallelFlowable; import io.reactivex.schedulers.Schedulers; -import org.reactivestreams.Subscriber; - -import java.lang.Thread.UncaughtExceptionHandler; -import java.util.concurrent.Callable; -import java.util.concurrent.ThreadFactory; /** * Utility class to inject handlers to certain standard RxJava operations. */ @@ -161,10 +144,10 @@ public static boolean isLockdown() { * Enables or disables the blockingX operators to fail * with an IllegalStateException on a non-blocking * scheduler such as computation or single. + *

History: 2.0.5 - experimental * @param enable enable or disable the feature - * @since 2.0.5 - experimental + * @since 2.1 */ - @Experimental public static void setFailOnNonBlockingScheduler(boolean enable) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); @@ -176,10 +159,10 @@ public static void setFailOnNonBlockingScheduler(boolean enable) { * Returns true if the blockingX operators fail * with an IllegalStateException on a non-blocking scheduler * such as computation or single. + *

History: 2.0.5 - experimental * @return true if the blockingX operators fail on a non-blocking scheduler - * @since 2.0.5 - experimental + * @since 2.1 */ - @Experimental public static boolean isFailOnNonBlockingScheduler() { return failNonBlockingScheduler; } @@ -1101,10 +1084,11 @@ public static Completable onAssembly(@NonNull Completable source) { /** * Sets the specific hook function. + *

History: 2.0.6 - experimental * @param handler the hook function to set, null allowed - * @since 2.0.6 - experimental + * @since 2.1 - beta */ - @Experimental + @Beta @SuppressWarnings("rawtypes") public static void setOnParallelAssembly(@Nullable Function handler) { if (lockdown) { @@ -1115,10 +1099,11 @@ public static void setOnParallelAssembly(@Nullable FunctionHistory: 2.0.6 - experimental * @return the hook function, may be null - * @since 2.0.6 - experimental + * @since 2.1 - beta */ - @Experimental + @Beta @SuppressWarnings("rawtypes") @Nullable public static Function getOnParallelAssembly() { @@ -1127,12 +1112,13 @@ public static void setOnParallelAssembly(@Nullable FunctionHistory: 2.0.6 - experimental * @param the value type of the source * @param source the hook's input value * @return the value returned by the hook - * @since 2.0.6 - experimental + * @since 2.1 - beta */ - @Experimental + @Beta @SuppressWarnings({ "rawtypes", "unchecked" }) @NonNull public static ParallelFlowable onAssembly(@NonNull ParallelFlowable source) { @@ -1148,11 +1134,11 @@ public static ParallelFlowable onAssembly(@NonNull ParallelFlowable so * such as awaiting a condition or signal * and should return true to indicate the operator * should not block but throw an IllegalArgumentException. + *

History: 2.0.5 - experimental * @return true if the blocking should be prevented * @see #setFailOnNonBlockingScheduler(boolean) - * @since 2.0.5 - experimental + * @since 2.1 */ - @Experimental public static boolean onBeforeBlocking() { BooleanSupplier f = onBeforeBlocking; if (f != null) { @@ -1169,12 +1155,12 @@ public static boolean onBeforeBlocking() { * Set the handler that is called when an operator attempts a blocking * await; the handler should return true to prevent the blocking * and to signal an IllegalStateException instead. + *

History: 2.0.5 - experimental * @param handler the handler to set, null resets to the default handler * that always returns false * @see #onBeforeBlocking() - * @since 2.0.5 - experimental + * @since 2.1 */ - @Experimental public static void setOnBeforeBlocking(@Nullable BooleanSupplier handler) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); @@ -1185,10 +1171,10 @@ public static void setOnBeforeBlocking(@Nullable BooleanSupplier handler) { /** * Returns the current blocking handler or null if no custom handler * is set. + *

History: 2.0.5 - experimental * @return the current blocking handler or null if not specified - * @since 2.0.5 - experimental + * @since 2.1 */ - @Experimental @Nullable public static BooleanSupplier getOnBeforeBlocking() { return onBeforeBlocking; @@ -1197,12 +1183,12 @@ public static BooleanSupplier getOnBeforeBlocking() { /** * Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()} * except using {@code threadFactory} for thread creation. + *

History: 2.0.5 - experimental * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any * system properties for configuring new thread creation. Cannot be null. * @return the created Scheduler instance - * @since 2.0.5 - experimental + * @since 2.1 */ - @Experimental @NonNull public static Scheduler createComputationScheduler(@NonNull ThreadFactory threadFactory) { return new ComputationScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null")); @@ -1211,12 +1197,12 @@ public static Scheduler createComputationScheduler(@NonNull ThreadFactory thread /** * Create an instance of the default {@link Scheduler} used for {@link Schedulers#io()} * except using {@code threadFactory} for thread creation. + *

History: 2.0.5 - experimental * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any * system properties for configuring new thread creation. Cannot be null. * @return the created Scheduler instance - * @since 2.0.5 - experimental + * @since 2.1 */ - @Experimental @NonNull public static Scheduler createIoScheduler(@NonNull ThreadFactory threadFactory) { return new IoScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null")); @@ -1225,12 +1211,12 @@ public static Scheduler createIoScheduler(@NonNull ThreadFactory threadFactory) /** * Create an instance of the default {@link Scheduler} used for {@link Schedulers#newThread()} * except using {@code threadFactory} for thread creation. + *

History: 2.0.5 - experimental * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any * system properties for configuring new thread creation. Cannot be null. * @return the created Scheduler instance - * @since 2.0.5 - experimental + * @since 2.1 */ - @Experimental @NonNull public static Scheduler createNewThreadScheduler(@NonNull ThreadFactory threadFactory) { return new NewThreadScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null")); @@ -1239,12 +1225,12 @@ public static Scheduler createNewThreadScheduler(@NonNull ThreadFactory threadFa /** * Create an instance of the default {@link Scheduler} used for {@link Schedulers#single()} * except using {@code threadFactory} for thread creation. + *

History: 2.0.5 - experimental * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any * system properties for configuring new thread creation. Cannot be null. * @return the created Scheduler instance - * @since 2.0.5 - experimental + * @since 2.1 */ - @Experimental @NonNull public static Scheduler createSingleScheduler(@NonNull ThreadFactory threadFactory) { return new SingleScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null")); diff --git a/src/main/java/io/reactivex/subjects/CompletableSubject.java b/src/main/java/io/reactivex/subjects/CompletableSubject.java index 2855bd0681..dad98fc11d 100644 --- a/src/main/java/io/reactivex/subjects/CompletableSubject.java +++ b/src/main/java/io/reactivex/subjects/CompletableSubject.java @@ -29,9 +29,9 @@ *

* The CompletableSubject doesn't store the Disposables coming through onSubscribe but * disposes them once the other onXXX methods were called (terminal state reached). - * @since 2.0.5 - experimental + *

History: 2.0.5 - experimental + * @since 2.1 */ -@Experimental public final class CompletableSubject extends Completable implements CompletableObserver { final AtomicReference observers; diff --git a/src/main/java/io/reactivex/subjects/MaybeSubject.java b/src/main/java/io/reactivex/subjects/MaybeSubject.java index 4c70398838..c1cda23955 100644 --- a/src/main/java/io/reactivex/subjects/MaybeSubject.java +++ b/src/main/java/io/reactivex/subjects/MaybeSubject.java @@ -29,10 +29,10 @@ *

* The MaybeSubject doesn't store the Disposables coming through onSubscribe but * disposes them once the other onXXX methods were called (terminal state reached). + *

History: 2.0.5 - experimental * @param the value type received and emitted - * @since 2.0.5 - experimental + * @since 2.1 */ -@Experimental public final class MaybeSubject extends Maybe implements MaybeObserver { final AtomicReference[]> observers; diff --git a/src/main/java/io/reactivex/subjects/SingleSubject.java b/src/main/java/io/reactivex/subjects/SingleSubject.java index 6e8b0e2006..4c486eac97 100644 --- a/src/main/java/io/reactivex/subjects/SingleSubject.java +++ b/src/main/java/io/reactivex/subjects/SingleSubject.java @@ -29,10 +29,10 @@ *

* The SingleSubject doesn't store the Disposables coming through onSubscribe but * disposes them once the other onXXX methods were called (terminal state reached). + *

History: 2.0.5 - experimental * @param the value type received and emitted - * @since 2.0.5 - experimental + * @since 2.1 */ -@Experimental public final class SingleSubject extends Single implements SingleObserver { final AtomicReference[]> observers; diff --git a/src/main/java/io/reactivex/subscribers/TestSubscriber.java b/src/main/java/io/reactivex/subscribers/TestSubscriber.java index e16af8feaf..8287efad2a 100644 --- a/src/main/java/io/reactivex/subscribers/TestSubscriber.java +++ b/src/main/java/io/reactivex/subscribers/TestSubscriber.java @@ -17,7 +17,6 @@ import org.reactivestreams.*; import io.reactivex.FlowableSubscriber; -import io.reactivex.annotations.Experimental; import io.reactivex.disposables.Disposable; import io.reactivex.functions.Consumer; import io.reactivex.internal.fuseable.QueueSubscription; @@ -410,11 +409,11 @@ public final TestSubscriber assertOf(Consumer> chec /** * Calls {@link #request(long)} and returns this. + *

History: 2.0.1 - experimental * @param n the request amount * @return this - * @since 2.0.1 - experimental + * @since 2.1 */ - @Experimental public final TestSubscriber requestMore(long n) { request(n); return this; diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableStrictTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableStrictTest.java deleted file mode 100644 index 2126c6a113..0000000000 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableStrictTest.java +++ /dev/null @@ -1,308 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.operators.flowable; - -import static org.junit.Assert.*; - -import java.util.*; -import java.util.concurrent.TimeUnit; - -import org.junit.Test; -import org.reactivestreams.*; - -import io.reactivex.*; -import io.reactivex.exceptions.TestException; -import io.reactivex.internal.subscribers.StrictSubscriber; -import io.reactivex.internal.subscriptions.BooleanSubscription; -import io.reactivex.processors.PublishProcessor; -import io.reactivex.schedulers.Schedulers; -import io.reactivex.subscribers.TestSubscriber; - -@Deprecated -public class FlowableStrictTest { - - @Test - public void empty() { - Flowable.empty() - .strict() - .test() - .assertResult(); - } - - @Test - public void just() { - Flowable.just(1) - .strict() - .test() - .assertResult(1); - } - - @Test - public void range() { - Flowable.range(1, 5) - .strict() - .test() - .assertResult(1, 2, 3, 4, 5); - } - - @Test - public void take() { - Flowable.range(1, 5) - .take(2) - .strict() - .test() - .assertResult(1, 2); - } - - @Test - public void backpressure() { - Flowable.range(1, 5) - .strict() - .test(0) - .assertEmpty() - .requestMore(1) - .assertValue(1) - .requestMore(2) - .assertValues(1, 2, 3) - .requestMore(2) - .assertResult(1, 2, 3, 4, 5); - } - - @Test - public void error() { - Flowable.error(new TestException()) - .strict() - .test() - .assertFailure(TestException.class); - } - - @Test - public void observeOn() { - Flowable.range(1, 5) - .hide() - .observeOn(Schedulers.single()) - .strict() - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1, 2, 3, 4, 5); - } - - @Test - public void invalidRequest() { - for (int i = 0; i > -100; i--) { - final int j = i; - final List items = new ArrayList(); - - Flowable.range(1, 2) - .strict() - .subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - s.request(j); - } - - @Override - public void onNext(Integer t) { - items.add(t); - } - - @Override - public void onError(Throwable t) { - items.add(t); - } - - @Override - public void onComplete() { - items.add("Done"); - } - }); - - assertTrue(items.toString(), items.size() == 1); - assertTrue(items.toString(), items.get(0) instanceof IllegalArgumentException); - assertTrue(items.toString(), items.get(0).toString().contains("§3.9")); - } - } - - @Test - public void doubleOnSubscribe() { - final BooleanSubscription bs1 = new BooleanSubscription(); - final BooleanSubscription bs2 = new BooleanSubscription(); - - final TestSubscriber ts = TestSubscriber.create(); - - Flowable.fromPublisher(new Publisher() { - @Override - public void subscribe(Subscriber p) { - p.onSubscribe(bs1); - p.onSubscribe(bs2); - } - }) - .strict() - .subscribe(new Subscriber() { - - @Override - public void onSubscribe(Subscription s) { - ts.onSubscribe(s); - } - - @Override - public void onNext(Object t) { - ts.onNext(t); - } - - @Override - public void onError(Throwable t) { - ts.onError(t); - } - - @Override - public void onComplete() { - ts.onComplete(); - } - }); - - ts.assertFailure(IllegalStateException.class); - - assertTrue(bs1.isCancelled()); - assertTrue(bs2.isCancelled()); - - String es = ts.errors().get(0).toString(); - assertTrue(es, es.contains("§2.12")); - } - - @Test - public void noCancelOnComplete() { - final BooleanSubscription bs = new BooleanSubscription(); - - Flowable.fromPublisher(new Publisher() { - @Override - public void subscribe(Subscriber p) { - p.onSubscribe(bs); - p.onComplete(); - } - }) - .strict() - .subscribe(new Subscriber() { - - Subscription s; - - @Override - public void onSubscribe(Subscription s) { - this.s = s; - } - - @Override - public void onNext(Object t) { - // not called - } - - @Override - public void onError(Throwable t) { - // not called - } - - @Override - public void onComplete() { - s.cancel(); - } - }); - - assertFalse(bs.isCancelled()); - } - - @Test - public void noCancelOnError() { - final BooleanSubscription bs = new BooleanSubscription(); - - Flowable.fromPublisher(new Publisher() { - @Override - public void subscribe(Subscriber p) { - p.onSubscribe(bs); - p.onError(new TestException()); - } - }) - .strict() - .subscribe(new Subscriber() { - - Subscription s; - - @Override - public void onSubscribe(Subscription s) { - this.s = s; - } - - @Override - public void onNext(Object t) { - // not called - } - - @Override - public void onError(Throwable t) { - s.cancel(); - } - - @Override - public void onComplete() { - // not called - } - }); - - assertFalse(bs.isCancelled()); - } - - @Test - public void normal() { - TestSubscriber ts = new TestSubscriber(); - - Flowable.range(1, 5) - .subscribe(new StrictSubscriber(ts)); - - ts.assertResult(1, 2, 3, 4, 5); - } - - @Test - public void badRequestOnNextRace() { - for (int i = 0; i < 500; i++) { - TestSubscriber ts = new TestSubscriber(); - - final PublishProcessor pp = PublishProcessor.create(); - - final StrictSubscriber s = new StrictSubscriber(ts); - - s.onSubscribe(new BooleanSubscription()); - - Runnable r1 = new Runnable() { - @Override - public void run() { - pp.onNext(1); - } - }; - - Runnable r2 = new Runnable() { - @Override - public void run() { - s.request(0); - } - }; - - TestHelper.race(r1, r2); - - if (ts.valueCount() == 0) { - ts.assertFailure(IllegalArgumentException.class); - } else { - ts.assertValue(1).assertNoErrors().assertNotComplete(); - } - } - } -}