diff --git a/DESIGN.md b/DESIGN.md index 53f828186a..5480b39948 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -528,13 +528,13 @@ interface ScalarCallable extends java.util.Callable { `ScalarCallable` is also `Callable` and thus its value can be extracted practically anytime. For convenience (and for sense), `ScalarCallable` overrides and hides the superclass' `throws Exception` clause - throwing during assembly time is likely unreasonable for scalars. -Since Reactive-Streams doesn't allow `null`s in the value flow, we have the opportunity to define `ScalarCallable`s and `Callable`s returning `null` should be considered as an empty source - allowing operators to dispatch on the type `Callable` first then branch on the nullness of `call()`. +Since Reactive Streams doesn't allow `null`s in the value flow, we have the opportunity to define `ScalarCallable`s and `Callable`s returning `null` should be considered as an empty source - allowing operators to dispatch on the type `Callable` first then branch on the nullness of `call()`. Interoperating with other libraries, at this level is possible. Reactor-Core uses the same pattern and the two libraries can work with each other's `Publisher+Callable` types. Unfortunately, this means subscription-time only fusion as `ScalarCallable`s live locally in each library. ##### Micro-fusion -Micro-fusion goes a step deeper and tries to reuse internal structures, mostly queues, in operator pairs, saving on allocation and sometimes on atomic operations. It's property is that, in a way, subverts the standard Reactive-Streams protocol between subsequent operators that both support fusion. However, from the outside world's view, they still work according to the RS protocol. +Micro-fusion goes a step deeper and tries to reuse internal structures, mostly queues, in operator pairs, saving on allocation and sometimes on atomic operations. It's property is that, in a way, subverts the standard Reactive Streams protocol between subsequent operators that both support fusion. However, from the outside world's view, they still work according to the RS protocol. Currently, two main kinds of micro-fusion opportunities are available. diff --git a/README.md b/README.md index 982f0c393d..fd53e02083 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ It extends the [observer pattern](http://en.wikipedia.org/wiki/Observer_pattern) #### Version 2.x ([Javadoc](http://reactivex.io/RxJava/2.x/javadoc/)) -- single dependency: [Reactive-Streams](https://github.com/reactive-streams/reactive-streams-jvm) +- single dependency: [Reactive Streams](https://github.com/reactive-streams/reactive-streams-jvm) - continued support for Java 6+ & [Android](https://github.com/ReactiveX/RxAndroid) 2.3+ - performance gains through design changes learned through the 1.x cycle and through [Reactive-Streams-Commons](https://github.com/reactor/reactive-streams-commons) research project. - Java 8 lambda-friendly API @@ -72,7 +72,7 @@ Flowable.just("Hello world") RxJava 2 features several base classes you can discover operators on: - - [`io.reactivex.Flowable`](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html): 0..N flows, supporting Reactive-Streams and backpressure + - [`io.reactivex.Flowable`](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html): 0..N flows, supporting Reactive Streams and backpressure - [`io.reactivex.Observable`](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html): 0..N flows, no backpressure, - [`io.reactivex.Single`](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Single.html): a flow of exactly 1 item or an error, - [`io.reactivex.Completable`](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Completable.html): a flow without items but only a completion or error signal, diff --git a/docs/What's-different-in-2.0.md b/docs/What's-different-in-2.0.md index fac50df56d..30f9a4ccba 100644 --- a/docs/What's-different-in-2.0.md +++ b/docs/What's-different-in-2.0.md @@ -1,6 +1,6 @@ -RxJava 2.0 has been completely rewritten from scratch on top of the Reactive-Streams specification. The specification itself has evolved out of RxJava 1.x and provides a common baseline for reactive systems and libraries. +RxJava 2.0 has been completely rewritten from scratch on top of the Reactive Streams specification. The specification itself has evolved out of RxJava 1.x and provides a common baseline for reactive systems and libraries. -Because Reactive-Streams has a different architecture, it mandates changes to some well known RxJava types. This wiki page attempts to summarize what has changed and describes how to rewrite 1.x code into 2.x code. +Because Reactive Streams has a different architecture, it mandates changes to some well known RxJava types. This wiki page attempts to summarize what has changed and describes how to rewrite 1.x code into 2.x code. For technical details on how to write operators for 2.x, please visit the [Writing Operators](https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0) wiki page. @@ -20,7 +20,7 @@ For technical details on how to write operators for 2.x, please visit the [Writi - [Subscriber](#subscriber) - [Subscription](#subscription) - [Backpressure](#backpressure) - - [Reactive-Streams compliance](#reactive-streams-compliance) + - [Reactive Streams compliance](#reactive-streams-compliance) - [Runtime hooks](#runtime-hooks) - [Error handling](#error-handling) - [Scheduler](#schedulers) @@ -105,7 +105,7 @@ When architecting dataflows (as an end-consumer of RxJava) or deciding upon what # Single -The 2.x `Single` reactive base type, which can emit a single `onSuccess` or `onError` has been redesigned from scratch. Its architecture now derives from the Reactive-Streams design. Its consumer type (`rx.Single.SingleSubscriber`) has been changed from being a class that accepts `rx.Subscription` resources to be an interface `io.reactivex.SingleObserver` that has only 3 methods: +The 2.x `Single` reactive base type, which can emit a single `onSuccess` or `onError` has been redesigned from scratch. Its architecture now derives from the Reactive Streams design. Its consumer type (`rx.Single.SingleSubscriber`) has been changed from being a class that accepts `rx.Subscription` resources to be an interface `io.reactivex.SingleObserver` that has only 3 methods: ```java interface SingleObserver { @@ -119,7 +119,7 @@ and follows the protocol `onSubscribe (onSuccess | onError)?`. # Completable -The `Completable` type remains largely the same. It was already designed along the Reactive-Streams style for 1.x so no user-level changes there. +The `Completable` type remains largely the same. It was already designed along the Reactive Streams style for 1.x so no user-level changes there. Similar to the naming changes, `rx.Completable.CompletableSubscriber` has become `io.reactivex.CompletableObserver` with `onSubscribe(Disposable)`: @@ -154,7 +154,7 @@ Maybe.just(1) # Base reactive interfaces -Following the style of extending the Reactive-Streams `Publisher` in `Flowable`, the other base reactive classes now extend similar base interfaces (in package `io.reactivex`): +Following the style of extending the Reactive Streams `Publisher` in `Flowable`, the other base reactive classes now extend similar base interfaces (in package `io.reactivex`): ```java interface ObservableSource { @@ -182,7 +182,7 @@ Flowable flatMap(Function> mapper Observable flatMap(Function> mapper); ``` -By having `Publisher` as input this way, you can compose with other Reactive-Streams compliant libraries without the need to wrap them or convert them into `Flowable` first. +By having `Publisher` as input this way, you can compose with other Reactive Streams compliant libraries without the need to wrap them or convert them into `Flowable` first. If an operator has to offer a reactive base type, however, the user will receive the full reactive class (as giving out an `XSource` is practically useless as it doesn't have operators on it): @@ -197,7 +197,7 @@ source.compose((Flowable flowable) -> # Subjects and Processors -In the Reactive-Streams specification, the `Subject`-like behavior, namely being a consumer and supplier of events at the same time, is done by the `org.reactivestreams.Processor` interface. As with the `Observable`/`Flowable` split, the backpressure-aware, Reactive-Streams compliant implementations are based on the `FlowableProcessor` class (which extends `Flowable` to give a rich set of instance operators). An important change regarding `Subject`s (and by extension, `FlowableProcessor`) that they no longer support `T -> R` like conversion (that is, input is of type `T` and the output is of type `R`). (We never had a use for it in 1.x and the original `Subject` came from .NET where there is a `Subject` overload because .NET allows the same class name with a different number of type arguments.) +In the Reactive Streams specification, the `Subject`-like behavior, namely being a consumer and supplier of events at the same time, is done by the `org.reactivestreams.Processor` interface. As with the `Observable`/`Flowable` split, the backpressure-aware, Reactive Streams compliant implementations are based on the `FlowableProcessor` class (which extends `Flowable` to give a rich set of instance operators). An important change regarding `Subject`s (and by extension, `FlowableProcessor`) that they no longer support `T -> R` like conversion (that is, input is of type `T` and the output is of type `R`). (We never had a use for it in 1.x and the original `Subject` came from .NET where there is a `Subject` overload because .NET allows the same class name with a different number of type arguments.) The `io.reactivex.subjects.AsyncSubject`, `io.reactivex.subjects.BehaviorSubject`, `io.reactivex.subjects.PublishSubject`, `io.reactivex.subjects.ReplaySubject` and `io.reactivex.subjects.UnicastSubject` in 2.x don't support backpressure (as part of the 2.x `Observable` family). @@ -296,7 +296,7 @@ In addition, operators requiring a predicate no longer use `Func1` b # Subscriber -The Reactive-Streams specification has its own Subscriber as an interface. This interface is lightweight and combines request management with cancellation into a single interface `org.reactivestreams.Subscription` instead of having `rx.Producer` and `rx.Subscription` separately. This allows creating stream consumers with less internal state than the quite heavy `rx.Subscriber` of 1.x. +The Reactive Streams specification has its own Subscriber as an interface. This interface is lightweight and combines request management with cancellation into a single interface `org.reactivestreams.Subscription` instead of having `rx.Producer` and `rx.Subscription` separately. This allows creating stream consumers with less internal state than the quite heavy `rx.Subscriber` of 1.x. ```java Flowable.range(1, 10).subscribe(new Subscriber() { @@ -354,7 +354,7 @@ Flowable.range(1, 10).delay(1, TimeUnit.SECONDS).subscribe(subscriber); subscriber.dispose(); ``` -Note also that due to Reactive-Streams compatibility, the method `onCompleted` has been renamed to `onComplete` without the trailing `d`. +Note also that due to Reactive Streams compatibility, the method `onCompleted` has been renamed to `onComplete` without the trailing `d`. Since 1.x `Observable.subscribe(Subscriber)` returned `Subscription`, users often added the `Subscription` to a `CompositeSubscription` for example: @@ -364,7 +364,7 @@ CompositeSubscription composite = new CompositeSubscription(); composite.add(Observable.range(1, 5).subscribe(new TestSubscriber())); ``` -Due to the Reactive-Streams specification, `Publisher.subscribe` returns void and the pattern by itself no longer works in 2.0. To remedy this, the method `E subscribeWith(E subscriber)` has been added to each base reactive class which returns its input subscriber/observer as is. With the two examples before, the 2.x code can now look like this since `ResourceSubscriber` implements `Disposable` directly: +Due to the Reactive Streams specification, `Publisher.subscribe` returns void and the pattern by itself no longer works in 2.0. To remedy this, the method `E subscribeWith(E subscriber)` has been added to each base reactive class which returns its input subscriber/observer as is. With the two examples before, the 2.x code can now look like this since `ResourceSubscriber` implements `Disposable` directly: ```java CompositeDisposable composite2 = new CompositeDisposable(); @@ -420,11 +420,11 @@ This behavior differs from 1.x where a `request` call went through a deferred lo # Subscription -In RxJava 1.x, the interface `rx.Subscription` was responsible for stream and resource lifecycle management, namely unsubscribing a sequence and releasing general resources such as scheduled tasks. The Reactive-Streams specification took this name for specifying an interaction point between a source and a consumer: `org.reactivestreams.Subscription` allows requesting a positive amount from the upstream and allows cancelling the sequence. +In RxJava 1.x, the interface `rx.Subscription` was responsible for stream and resource lifecycle management, namely unsubscribing a sequence and releasing general resources such as scheduled tasks. The Reactive Streams specification took this name for specifying an interaction point between a source and a consumer: `org.reactivestreams.Subscription` allows requesting a positive amount from the upstream and allows cancelling the sequence. To avoid the name clash, the 1.x `rx.Subscription` has been renamed into `io.reactivex.Disposable` (somewhat resembling .NET's own IDisposable). -Because Reactive-Streams base interface, `org.reactivestreams.Publisher` defines the `subscribe()` method as `void`, `Flowable.subscribe(Subscriber)` no longer returns any `Subscription` (or `Disposable`). The other base reactive types also follow this signature with their respective subscriber types. +Because Reactive Streams base interface, `org.reactivestreams.Publisher` defines the `subscribe()` method as `void`, `Flowable.subscribe(Subscriber)` no longer returns any `Subscription` (or `Disposable`). The other base reactive types also follow this signature with their respective subscriber types. The other overloads of `subscribe` now return `Disposable` in 2.x. @@ -436,19 +436,19 @@ The original `Subscription` container types have been renamed and updated # Backpressure -The Reactive-Streams specification mandates operators supporting backpressure, specifically via the guarantee that they don't overflow their consumers when those don't request. Operators of the new `Flowable` base reactive type now consider downstream request amounts properly, however, this doesn't mean `MissingBackpressureException` is gone. The exception is still there but this time, the operator that can't signal more `onNext` will signal this exception instead (allowing better identification of who is not properly backpressured). +The Reactive Streams specification mandates operators supporting backpressure, specifically via the guarantee that they don't overflow their consumers when those don't request. Operators of the new `Flowable` base reactive type now consider downstream request amounts properly, however, this doesn't mean `MissingBackpressureException` is gone. The exception is still there but this time, the operator that can't signal more `onNext` will signal this exception instead (allowing better identification of who is not properly backpressured). As an alternative, the 2.x `Observable` doesn't do backpressure at all and is available as a choice to switch over. -# Reactive-Streams compliance +# Reactive Streams compliance **updated in 2.0.7** -**The `Flowable`-based sources and operators are, as of 2.0.7, fully Reactive-Streams version 1.0.0 specification compliant.** +**The `Flowable`-based sources and operators are, as of 2.0.7, fully Reactive Streams version 1.0.0 specification compliant.** Before 2.0.7, the operator `strict()` had to be applied in order to achieve the same level of compliance. In 2.0.7, the operator `strict()` returns `this`, is deprecated and will be removed completely in 2.1.0. -As one of the primary goals of RxJava 2, the design focuses on performance and in order enable it, RxJava 2.0.7 adds a custom `io.reactivex.FlowableSubscriber` interface (extends `org.reactivestreams.Subscriber`) but adds no new methods to it. The new interface is **constrained to RxJava 2** and represents a consumer to `Flowable` that is able to work in a mode that relaxes the Reactive-Streams version 1.0.0 specification in rules §1.3, §2.3, §2.12 and §3.9: +As one of the primary goals of RxJava 2, the design focuses on performance and in order enable it, RxJava 2.0.7 adds a custom `io.reactivex.FlowableSubscriber` interface (extends `org.reactivestreams.Subscriber`) but adds no new methods to it. The new interface is **constrained to RxJava 2** and represents a consumer to `Flowable` that is able to work in a mode that relaxes the Reactive Streams version 1.0.0 specification in rules §1.3, §2.3, §2.12 and §3.9: - §1.3 relaxation: `onSubscribe` may run concurrently with `onNext` in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the resposibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`. - §2.3 relaxation: calling `Subscription.cancel` and `Subscription.request` from `FlowableSubscriber.onComplete()` or `FlowableSubscriber.onError()` is considered a no-operation. @@ -603,7 +603,7 @@ Integer i = Flowable.range(100, 100).blockingLast(); (The reason for this is twofold: performance and ease of use of the library as a synchronous Java 8 Streams-like processor.) -Another significant difference between `rx.Subscriber` (and co) and `org.reactivestreams.Subscriber` (and co) is that in 2.x, your `Subscriber`s and `Observer`s are not allowed to throw anything but fatal exceptions (see `Exceptions.throwIfFatal()`). (The Reactive-Streams specification allows throwing `NullPointerException` if the `onSubscribe`, `onNext` or `onError` receives a `null` value, but RxJava doesn't let `null`s in any way.) This means the following code is no longer legal: +Another significant difference between `rx.Subscriber` (and co) and `org.reactivestreams.Subscriber` (and co) is that in 2.x, your `Subscriber`s and `Observer`s are not allowed to throw anything but fatal exceptions (see `Exceptions.throwIfFatal()`). (The Reactive Streams specification allows throwing `NullPointerException` if the `onSubscribe`, `onNext` or `onError` receives a `null` value, but RxJava doesn't let `null`s in any way.) This means the following code is no longer legal: ```java Subscriber subscriber = new Subscriber() { @@ -933,7 +933,7 @@ To make sure the final API of 2.0 is clean as possible, we remove methods and ot ## doOnCancel/doOnDispose/unsubscribeOn -In 1.x, the `doOnUnsubscribe` was always executed on a terminal event because 1.x' `SafeSubscriber` called `unsubscribe` on itself. This was practically unnecessary and the Reactive-Streams specification states that when a terminal event arrives at a `Subscriber`, the upstream `Subscription` should be considered cancelled and thus calling `cancel()` is a no-op. +In 1.x, the `doOnUnsubscribe` was always executed on a terminal event because 1.x' `SafeSubscriber` called `unsubscribe` on itself. This was practically unnecessary and the Reactive Streams specification states that when a terminal event arrives at a `Subscriber`, the upstream `Subscription` should be considered cancelled and thus calling `cancel()` is a no-op. For the same reason, `unsubscribeOn` is not called on the regular termination path but only when there is an actual `cancel` (or `dispose`) call on the chain. diff --git a/docs/Writing-operators-for-2.0.md b/docs/Writing-operators-for-2.0.md index 7b6e57666e..9649c02be7 100644 --- a/docs/Writing-operators-for-2.0.md +++ b/docs/Writing-operators-for-2.0.md @@ -40,7 +40,7 @@ Writing operators, source-like (`fromEmitter`) or intermediate-like (`flatMap`) *(If you have been following [my blog](http://akarnokd.blogspot.hu/) about RxJava internals, writing operators is maybe only 2 times harder than 1.x; some things have moved around, some tools popped up while others have been dropped but there is a relatively straight mapping from 1.x concepts and approaches to 2.x concepts and approaches.)* -In this article, I'll describe the how-to's from the perspective of a developer who skipped the 1.x knowledge base and basically wants to write operators that conforms the Reactive-Streams specification as well as RxJava 2.x's own extensions and additional expectations/requirements. +In this article, I'll describe the how-to's from the perspective of a developer who skipped the 1.x knowledge base and basically wants to write operators that conforms the Reactive Streams specification as well as RxJava 2.x's own extensions and additional expectations/requirements. Since **Reactor 3** has the same architecture as **RxJava 2** (no accident, I architected and contributed 80% of **Reactor 3** as well) the same principles outlined in this page applies to writing operators for **Reactor 3**. Note however that they chose different naming and locations for their utility and support classes so you may have to search for the equivalent components. @@ -66,7 +66,7 @@ When dealing with backpressure in `Flowable` operators, one needs a way to accou The naive approach for accounting would be to simply call `AtomicLong.getAndAdd()` with new requests and `AtomicLong.addAndGet()` for decrementing based on how many elements were emitted. -The problem with this is that the Reactive-Streams specification declares `Long.MAX_VALUE` as the upper bound for outstanding requests (interprets it as the unbounded mode) but adding two large longs may overflow the `long` into a negative value. In addition, if for some reason, there are more values emitted than were requested, the subtraction may yield a negative current request value, causing crashes or hangs. +The problem with this is that the Reactive Streams specification declares `Long.MAX_VALUE` as the upper bound for outstanding requests (interprets it as the unbounded mode) but adding two large longs may overflow the `long` into a negative value. In addition, if for some reason, there are more values emitted than were requested, the subtraction may yield a negative current request value, causing crashes or hangs. Therefore, both addition and subtraction have to be capped at `Long.MAX_VALUE` and `0` respectively. Since there is no dedicated `AtomicLong` method for it, we have to use a Compare-And-Set loop. (Usually, requesting happens relatively rarely compared to emission amounts thus the lack of dedicated machine code instruction is not a performance bottleneck.) @@ -225,7 +225,7 @@ This simplified queue API gets rid of the unused parts (iterator, collections AP ## Deferred actions -The Reactive-Streams has a strict requirement that calling `onSubscribe()` must happen before any calls to the rest of the `onXXX` methods and by nature, any calls to `Subscription.request()` and `Subscription.cancel()`. The same logic applies to the design of `Observable`, `Single`, `Completable` and `Maybe` with their connection type of `Disposable`. +The Reactive Streams has a strict requirement that calling `onSubscribe()` must happen before any calls to the rest of the `onXXX` methods and by nature, any calls to `Subscription.request()` and `Subscription.cancel()`. The same logic applies to the design of `Observable`, `Single`, `Completable` and `Maybe` with their connection type of `Disposable`. Often though, such call to `onSubscribe` may happen later than the respective `cancel()` needs to happen. For example, the user may want to call `cancel()` before the respective `Subscription` actually becomes available in `subscribeOn`. Other operators may need to call `onSubscribe` before they connect to other sources but at that time, there is no direct way for relaying a `cancel` call to an unavailable upstream `Subscription`. @@ -286,7 +286,7 @@ The same pattern applies to `Subscription` with its `cancel()` method and with h ### Deferred requesting -With `Flowable`s (and Reactive-Streams `Publisher`s) the `request()` calls need to be deferred as well. In one form (the simpler one), the respective late `Subscription` will eventually arrive and we need to relay all previous and all subsequent request amount to its `request()` method. +With `Flowable`s (and Reactive Streams `Publisher`s) the `request()` calls need to be deferred as well. In one form (the simpler one), the respective late `Subscription` will eventually arrive and we need to relay all previous and all subsequent request amount to its `request()` method. In 1.x, this behavior was implicitly provided by `rx.Subscriber` but at a high cost that had to be payed by all instances whether or not they needed this feature. @@ -561,7 +561,7 @@ On the fast path, when we try to leave it, it is possible a concurrent call to ` ## FlowableSubscriber -Version 2.0.7 introduced a new interface, `FlowableSubscriber` that extends `Subscriber` from Reactive-Streams. It has the same methods with the same parameter types but different textual rules attached to it, a set of relaxations to the Reactive-Streams specification to enable better performing RxJava internals while still honoring the specification to the letter for non-RxJava consumers of `Flowable`s. +Version 2.0.7 introduced a new interface, `FlowableSubscriber` that extends `Subscriber` from Reactive Streams. It has the same methods with the same parameter types but different textual rules attached to it, a set of relaxations to the Reactive Streams specification to enable better performing RxJava internals while still honoring the specification to the letter for non-RxJava consumers of `Flowable`s. The rule relaxations are as follows: @@ -588,7 +588,7 @@ The other base reactive consumers, `Observer`, `SingleObserver`, `MaybeObserver` # Backpressure and cancellation -Backpressure (or flow control) in Reactive-Streams is the means to tell the upstream how many elements to produce or to tell it to stop producing elements altogether. Unlike the name suggest, there is no physical pressure preventing the upstream from calling `onNext` but the protocol to honor the request amount. +Backpressure (or flow control) in Reactive Streams is the means to tell the upstream how many elements to produce or to tell it to stop producing elements altogether. Unlike the name suggest, there is no physical pressure preventing the upstream from calling `onNext` but the protocol to honor the request amount. ## Replenishing @@ -1425,7 +1425,7 @@ public final class FlowableMyOperator extends Flowable { } ``` -When taking other reactive types as inputs in these operators, it is recommended one defines the base reactive interfaces instead of the abstract classes, allowing better interoperability between libraries (especially with `Flowable` operators and other Reactive-Streams `Publisher`s). To recap, these are the class-interface pairs: +When taking other reactive types as inputs in these operators, it is recommended one defines the base reactive interfaces instead of the abstract classes, allowing better interoperability between libraries (especially with `Flowable` operators and other Reactive Streams `Publisher`s). To recap, these are the class-interface pairs: - `Flowable` - `Publisher` - `FlowableSubscriber`/`Subscriber` - `Observable` - `ObservableSource` - `Observer` @@ -1433,7 +1433,7 @@ When taking other reactive types as inputs in these operators, it is recommended - `Completable` - `CompletableSource` - `CompletableObserver` - `Maybe` - `MaybeSource` - `MaybeObserver` -RxJava 2.x locks down `Flowable.subscribe` (and the same methods in the other types) in order to provide runtime hooks into the various flows, therefore, implementors are given the `subscribeActual()` to be overridden. When it is invoked, all relevant hooks and wrappers have been applied. Implementors should avoid throwing unchecked exceptions as the library generally can't deliver it to the respective `Subscriber` due to lifecycle restrictions of the Reactive-Streams specification and sends it to the global error consumer via `RxJavaPlugins.onError`. +RxJava 2.x locks down `Flowable.subscribe` (and the same methods in the other types) in order to provide runtime hooks into the various flows, therefore, implementors are given the `subscribeActual()` to be overridden. When it is invoked, all relevant hooks and wrappers have been applied. Implementors should avoid throwing unchecked exceptions as the library generally can't deliver it to the respective `Subscriber` due to lifecycle restrictions of the Reactive Streams specification and sends it to the global error consumer via `RxJavaPlugins.onError`. Unlike in 1.x, In the example above, the incoming `Subscriber` is simply used directly for subscribing again (but still at most once) without any kind of wrapping. In 1.x, one needs to call `Subscribers.wrap` to avoid double calls to `onStart` and cause unexpected double initialization or double-requesting. @@ -1516,7 +1516,7 @@ public final class MyOperator implements FlowableOperator { You may recognize that implementing operators via extension or lifting looks quite similar. In both cases, one usually implements a `FlowableSubscriber` (`Observer`, etc) that takes a downstream `Subscriber`, implements the business logic in the `onXXX` methods and somehow (manually or as part of `lift()`'s lifecycle) gets subscribed to an upstream source. -The benefit of applying the Reactive-Streams design to all base reactive types is that each consumer type is now an interface and can be applied to operators that have to extend some class. This was a pain in 1.x because `Subscriber` and `SingleSubscriber` are classes themselves, plus `Subscriber.request()` is a protected-final method and an operator's `Subscriber` can't implement the `Producer` interface at the same time. In 2.x there is no such problem and one can have both `Subscriber`, `Subscription` or even `Observer` together in the same consumer type. +The benefit of applying the Reactive Streams design to all base reactive types is that each consumer type is now an interface and can be applied to operators that have to extend some class. This was a pain in 1.x because `Subscriber` and `SingleSubscriber` are classes themselves, plus `Subscriber.request()` is a protected-final method and an operator's `Subscriber` can't implement the `Producer` interface at the same time. In 2.x there is no such problem and one can have both `Subscriber`, `Subscription` or even `Observer` together in the same consumer type. # Operator fusion @@ -1569,12 +1569,12 @@ This is the level of the **Rx.NET** library (even up to 3.x) that supports compo This is what **RxJava 1.x** is categorized, it supports composition, backpressure and synchronous cancellation along with the ability to lift an operator into a sequence. #### Generation 3 -This is the level of the Reactive-Streams based libraries such as **Reactor 2** and **Akka-Stream**. They are based upon a specification that evolved out of RxJava but left behind its drawbacks (such as the need to return anything from `subscribe()`). This is incompatible with RxJava 1.x and thus 2.x had to be rewritten from scratch. +This is the level of the Reactive Streams based libraries such as **Reactor 2** and **Akka-Stream**. They are based upon a specification that evolved out of RxJava but left behind its drawbacks (such as the need to return anything from `subscribe()`). This is incompatible with RxJava 1.x and thus 2.x had to be rewritten from scratch. #### Generation 4 -This level expands upon the Reactive-Streams interfaces with operator-fusion (in a compatible fashion, that is, op-fusion is optional between two stages and works without them). **Reactor 3** and **RxJava 2** are at this level. The material around **Akka-Stream** mentions operator-fusion as well, however, **Akka-Stream** is not a native Reactive-Streams implementation (requires a materializer to get a `Publisher` out) and as such it is only Gen 3. +This level expands upon the Reactive Streams interfaces with operator-fusion (in a compatible fashion, that is, op-fusion is optional between two stages and works without them). **Reactor 3** and **RxJava 2** are at this level. The material around **Akka-Stream** mentions operator-fusion as well, however, **Akka-Stream** is not a native Reactive Streams implementation (requires a materializer to get a `Publisher` out) and as such it is only Gen 3. -There are discussions among the 4th generation library providers to have the elements of operator-fusion standardized in Reactive-Streams 2.0 specification (or in a neighboring extension) and have **RxJava 3** and **Reactor 4** work together on that aspect as well. +There are discussions among the 4th generation library providers to have the elements of operator-fusion standardized in Reactive Streams 2.0 specification (or in a neighboring extension) and have **RxJava 3** and **Reactor 4** work together on that aspect as well. ## Components @@ -1629,7 +1629,7 @@ The reason for the two separate interfaces is that if a source is constant, like `Callable` denotes sources, such as `fromCallable` that indicates the single value has to be calculated at runtime of the flow. By this logic, you can see that `ScalarCallable` is a `Callable` on its own right because the constant can be "calculated" as late as the runtime phase of the flow. -Since Reactive-Streams forbids using `null`s as emission values, we can use `null` in `(Scalar)Callable` marked sources to indicate there is no value to be emitted, thus one can't mistake an user's `null` with the empty indicator `null`. For example, this is how `empty()` is implemented: +Since Reactive Streams forbids using `null`s as emission values, we can use `null` in `(Scalar)Callable` marked sources to indicate there is no value to be emitted, thus one can't mistake an user's `null` with the empty indicator `null`. For example, this is how `empty()` is implemented: ```java final class FlowableEmpty extends Flowable implements ScalarCallable { diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 79fcc9b432..1994632307 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -557,7 +557,7 @@ public static Completable fromObservable(final ObservableSource observabl * *

* The {@link Publisher} must follow the - * Reactive-Streams specification. + * Reactive Streams specification. * Violating the specification may result in undefined behavior. *

* If possible, use {@link #create(CompletableOnSubscribe)} to create a diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 7dc6d77716..4fe78199fe 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -2295,7 +2295,7 @@ public static Flowable fromIterable(Iterable source) { * Flowable. *

* The {@link Publisher} must follow the - * Reactive-Streams specification. + * Reactive Streams specification. * Violating the specification may result in undefined behavior. *

* If possible, use {@link #create(FlowableOnSubscribe, BackpressureStrategy)} to create a diff --git a/src/main/java/io/reactivex/FlowableSubscriber.java b/src/main/java/io/reactivex/FlowableSubscriber.java index 2ef1019acf..e2636406e1 100644 --- a/src/main/java/io/reactivex/FlowableSubscriber.java +++ b/src/main/java/io/reactivex/FlowableSubscriber.java @@ -17,7 +17,7 @@ import org.reactivestreams.*; /** - * Represents a Reactive-Streams inspired Subscriber that is RxJava 2 only + * Represents a Reactive Streams inspired Subscriber that is RxJava 2 only * and weakens rules §1.3 and §3.9 of the specification for gaining performance. * *

History: 2.0.7 - experimental; 2.1 - beta diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 20b829ad2e..d2f11d2b5b 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -51,7 +51,7 @@ * *

* The design of this class was derived from the - * Reactive-Streams design and specification + * Reactive Streams design and specification * by removing any backpressure-related infrastructure and implementation detail, replacing the * {@code org.reactivestreams.Subscription} with {@link Disposable} as the primary means to dispose of * a flow. @@ -1985,12 +1985,12 @@ public static Observable fromIterable(Iterable source) { } /** - * Converts an arbitrary Reactive-Streams Publisher into an Observable. + * Converts an arbitrary Reactive Streams Publisher into an Observable. *

* *

* The {@link Publisher} must follow the - * Reactive-Streams specification. + * Reactive Streams specification. * Violating the specification may result in undefined behavior. *

* If possible, use {@link #create(ObservableOnSubscribe)} to create a @@ -3982,7 +3982,7 @@ public static Observable timer(long delay, TimeUnit unit, Scheduler schedu /** * Create an Observable by wrapping an ObservableSource which has to be implemented according - * to the Reactive-Streams-based Observable specification by handling + * to the Reactive Streams based Observable specification by handling * disposal correctly; no safeguards are provided by the Observable itself. *

*
Scheduler:
@@ -11229,7 +11229,7 @@ public final Observable retryWhen( * Subscribes to the current Observable and wraps the given Observer into a SafeObserver * (if not already a SafeObserver) that * deals with exceptions thrown by a misbehaving Observer (that doesn't follow the - * Reactive-Streams specification). + * Reactive Streams specification). *
*
Scheduler:
*
{@code safeSubscribe} does not operate by default on a particular {@link Scheduler}.
diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index f97f4d22b7..3dd0776e11 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -755,7 +755,7 @@ public static Single fromFuture(Future future, Scheduler sch * the source has more than one element, an IndexOutOfBoundsException is signalled. *

* The {@link Publisher} must follow the - * Reactive-Streams specification. + * Reactive Streams specification. * Violating the specification may result in undefined behavior. *

* If possible, use {@link #create(SingleOnSubscribe)} to create a diff --git a/src/main/java/io/reactivex/exceptions/ProtocolViolationException.java b/src/main/java/io/reactivex/exceptions/ProtocolViolationException.java index 09c0361108..ff36ce1cf3 100644 --- a/src/main/java/io/reactivex/exceptions/ProtocolViolationException.java +++ b/src/main/java/io/reactivex/exceptions/ProtocolViolationException.java @@ -14,7 +14,7 @@ package io.reactivex.exceptions; /** - * Explicitly named exception to indicate a Reactive-Streams + * Explicitly named exception to indicate a Reactive Streams * protocol violation. *

History: 2.0.6 - experimental; 2.1 - beta * @since 2.2 diff --git a/src/main/java/io/reactivex/internal/subscribers/StrictSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/StrictSubscriber.java index 0fb9d5666c..c7770bf163 100644 --- a/src/main/java/io/reactivex/internal/subscribers/StrictSubscriber.java +++ b/src/main/java/io/reactivex/internal/subscribers/StrictSubscriber.java @@ -23,7 +23,7 @@ /** * Ensures that the event flow between the upstream and downstream follow - * the Reactive-Streams 1.0 specification by honoring the 3 additional rules + * the Reactive Streams 1.0 specification by honoring the 3 additional rules * (which are omitted in standard operators due to performance reasons). *

    *
  • §1.3: onNext should not be called concurrently until onSubscribe returns
  • diff --git a/src/main/java/io/reactivex/package-info.java b/src/main/java/io/reactivex/package-info.java index 7d78294b6d..75ceb6cd7b 100644 --- a/src/main/java/io/reactivex/package-info.java +++ b/src/main/java/io/reactivex/package-info.java @@ -25,7 +25,7 @@ * Completable/CompletableObserver interfaces and associated operators (in * the {@code io.reactivex.internal.operators} package) are inspired by the * Reactive Rx library in Microsoft .NET but designed and implemented on - * the more advanced Reactive-Streams ( http://www.reactivestreams.org ) principles.

    + * the more advanced Reactive Streams ( http://www.reactivestreams.org ) principles.

    *

    * More information can be found at http://msdn.microsoft.com/en-us/data/gg577609.