A list of concise write ups on the implementation of RxJava in Android
-
Reactive Programming is a programming paradigm oriented around data flows and the propagation of change i.e. it is all about responding to value changes. For example, let’s say we define x = y+z. When we change the value of y or z, the value of x automatically changes. This can be done by observing the values of y and z.
-
Reactive Extensions is a library that follows Reactive Programming principles to compose asynchronous and event-based programs by using observable sequence.
-
RxJava is a Java based implementation of Reactive Programming.
-
RxAndroid is specific to Android platform which utilises some classes on top of the RxJava library.
The building blocks of RxJava are:
-
Observable: class that emits a stream of data or events. i.e. a class that can be used to perform some action, and publish the result.
Observable observable = Observable.just("A", "B", "C", "D", "E", "F");
-
Observer: class that receivers the events or data and acts upon it. i.e. a class that waits and watches the Observable, and reacts whenever the Observable publishes results. The Observer has 4 interface methods to know the different states of the Observable.
onSubscribe()
: This method is invoked when the Observer is subscribed to the Observable.onNext()
: This method is called when a new item is emitted from the Observable.onError()
: This method is called when an error occurs and the emission of data is not successfully completed.onComplete()
: This method is called when the Observable has successfully completed emitting all items.
new Observer() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } @Override public void onNext(Object o) { System.out.println("onNext: " + o); } @Override public void onError(Throwable e) { System.out.println("onError: " + e.getMessage()); } @Override public void onComplete() { System.out.println("onComplete"); } };
Operators allow you to manipulate the data that was emitted or create new Observables.
-
Create — This operator creates an Observable from scratch by calling observer methods programmatically. An emitter is provided through which we can call the respective interface methods when needed. The
create()
method does not have an option to pass values. So we have to create the list beforehand and perform operations on the list inside theonNext()
method. Sample Implementation -
Defer — This operator does not create the Observable until the Observer subscribes. The only downside to defer() is that it creates a new Observable each time you get a new Observer. create() can use the same function for each subscriber, so it’s more efficient. The sample implementation creates an Observable that emits a value.
-
From — This operator creates an Observable from set of items using an Iterable, which means we can pass a list or an array of items to the Observable and each item is emitted one at a time. Some of the examples of the operators include
fromCallable()
,fromFuture()
,fromIterable()
,fromPublisher()
,fromArray()
. The sample implementation will print each item from the array one by one. The order is also preserved. -
Interval — This operator creates an Observable that emits a sequence of integers spaced by a particular time interval. The sample implementation will print values from 0 after every second.
-
Just — This operator takes a list of arguments (maximum 10) and converts the items into Observable items. just() makes only 1 emission. For instance, If an array is passed as a parameter to the just() method, the array is emitted as single item instead of individual numbers. Note that if you pass null to just(), it will return an Observable that emits null as an item. The sample implementation will print the entire list in a single emission.
Note:Difference between Observable.from()
and Observable.just()
— For the same input, if you see the sample code, Observable.just()
emits only once whereas Observable.from()
emits n times i.e. the length of the array.
-
Range — This operator creates an Observable that emits a range of sequential integers. The function takes two arguments: the starting number and length. The sample implementation has a starting number of 2 and a range of 5 numbers, so it will print values from 2 to 6.
-
Repeat — This operator creates an Observable that emits a particular item or sequence of items repeatedly. There is an option to pass the number of repetitions that can take place as well. The sample implementation will print the same values as the previous
range()
operator but since the repeat is specified as 2, the same values will be printed twice. -
Timer — This operator creates an Observable that emits one particular item after a span of time that you specify. The sample implementation will emit only once after a 1 second delay.
Note:Difference between Observable.interval()
and Observable.timer()
— timer()
emits just a single item after a delay whereas interval()
operator, on the other hand, will emit items spaced out with a given interval.
-
Buffer — This operator periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time. The sample implementation will emit 2 items at a time since the buffer is specified as 2.
-
Map — This operator transforms the items emitted by an Observable by applying a function to each item. map() operator allows for us to modify the emitted item from the Observable and then emits the modified item. In the sample implementation we have a list of integer values. Using
map()
operator, each integer in the list is multiplied by 2 and the result is emitted. Notice that the order of insertion is maintained during emission. -
FlatMap — This operator transforms each item emitted by an Observable but instead of returning the modified item, it returns the Observable itself which can emit data again. In other words, they merge items emitted by multiple Observables and returns a single Observable. The important difference between FlatMap and other transformation operators is that the order in which the items are emitted is not maintained. In the sample implementation, we are going to follow the same test with the same conditions except instead of
map()
we are going to useflatMap()
. Notice that the order of insertion is not maintained. -
SwitchMap — Whenever a new item is emitted by the Observable, it will unsubscribe to the Observable that was generated from the previously emitted item and begin only mirroring the current one. In other words, it returns the latest Observable and emits the items from it. In the sample implementation, we are going to follow the same test with the same conditions, except this time instead of
flatMap()
we are going to useswitchMap()
. The output of the below code will be 12 because: we are passing a list of integers (1,2,3,4,5,6) and usingswitchMap()
, multiplying the each integer by 2.switchMap()
always returns the latest Observable and emits from it. -
ConcatMap — This operator functions the same way as
flatMap()
, the difference being inconcatMap()
the order in which items are emitted are maintained. One disadvantage ofconcatMap()
is that it waits for each observable to finish all the work until next one is processed. In the sample implementation, we are going to follow the same test with the same conditions, except this time instead ofswitchMap()
we are going to useconcatMap()
.
Scenarios we can use the different operators:
-
Map operator can be used when we fetch items from the server and need to modify it before emitting to the UI.
-
FlatMap operator can be used when we know that the order of the items are not important.
-
SwitchMap is best suited for scenarios such as a feed page, when pull to refresh is enabled. When user refreshes the screen, the older feed response is ignored and only the latest request results are emitted to the UI when using a SwitchMap.
-
GroupBy — This operator divides an Observable into a set of Observables that each emit a different group of items from the original Observable, organised by key. The sample implementation will create an Observable with range of 1 to 10 numbers. We use the
groupBy()
operator to emit only even numbers from the list. -
Scan — This operator Transform each item into another item, like you did with map. But also include the “previous” item when you get around to doing a transform. The sample implementation we have a range of 1 to 10 numbers. The
scan()
operator emits two integers at a time. The below code adds the two integers that is emitted and emits the sum.
-
Debounce — This operator only emits an item from an Observable if a particular timespan has passed without it emitting another item. Let’s explain this operator with an example. Let’s say we are implementing a search feature in Android using
AutoCompleteTextView
. Whenever a user enters a character, we would need to fetch the list of items corresponding to that character. If the user enters 10 characters, and if we are fetching the data from a backend api, then that would mean 10 api calls to the backend. With thedebounce()
operator, we can specify the wait time (for instance, 2 seconds). Then the Observable, will wait 2 seconds every time the user enters a character in the EditText. If the user types another character before the 2 seconds are up, then the Observable waits another 2 seconds. If the user does not enter another character at the end of 2 seconds, the rest api is called. There are lots of resources online for this implementation. Please check this link and this link for a sample implementation. -
Distinct — This operator suppresses duplicate items emitted by an Observable. The distinct operator works very well with primitive data types. But in order to work with a custom dataType, we need to override the
equals()
andhashCode()
methods. The sample implementation provides an example ofdistinct()
operator. When we pass a list of duplicate integer values, the Observable uses thedistinct()
operator to emit only unique values from the list. -
ElementAt — This operator emits only one item ’n’ emitted by an Observable. We can specify the position we need to emit using the
elementAt
operator. For instance,elementAt(0)
will emit the first item in the list. In the sample implementation we can specify a list of integers and using the elementAt() operator, we can fetch the element at that particular index. If the index is not in the list, then nothing will be emitted. -
Filter — This operator emits only those items from an Observable that pass a predicate test. In the sample implementation or a list of integers from 1 to 6, we add a filter condition that filters only the even numbers and emits those integers.
-
IgnoreElements — This operator does not emit any items from an Observable but mirrors its termination notification (either
onComplete
oronError
). If you do not care about the items being emitted by an Observable, but you do want to be notified when it completes or when it terminates with an error, you can apply theignoreElements()
operator to the Observable, which will ensure that it will never call the observers’onNext()
methods. In the sample implementation only theonComplete()
method will be called once the emission of items is complete. There is noonNext()
method. -
Sample — This operator emits the most recent item emitted by an Observable within periodic time intervals. The Sample operator periodically looks at an Observable and emits whichever item it has most recently emitted since the previous sampling. In the sample code, we have a list of integers from 1 to 6. The Observable emits each integer every second. We can use the
sample()
operator to check the emitted items every 2 seconds and emit the latest value from the Observable. -
Skip — skip(n) operator suppresses the first n items emitted by an Observable. The sample code demonstrates the use of
skip()
operator. Let’s say we have an Observable that emits the first 10 alphabets and ifskip(4)
operator is used, it skips the first 4 alphabets from the list and emits only the remaining 6 items. -
SkipLast —
skipLast(n)
operator suppresses the last n items emitted by an Observable. The sample code demonstrates the use ofskipLast()
operator. Let’s say we have an Observable that emits the first 10 alphabets and ifskipLast(4)
operator is used, it skips the last 4 alphabets from the list and emits only the remaining 6 items. -
Take —
take(n)
operator is the exact opposite of Skip. It emit only the first n items emitted by an Observable. The sample code demonstrates the use oftake()
operator. Let’s say we have an Observable that emits the first 10 alphabets and iftake(4)
operator is used, it emits the first 4 alphabets from the list and skips the remaining 6 items. -
TakeLast —
takeLast(n)
operator emit only the last n items emitted by an Observable. The sample code demonstrates the use oftakeLast()
operator. Let’s say we have an Observable that emits the first 10 alphabets and iftakeLast(4)
operator is used, it emits the last 4 alphabets from the list and skips the remaining 6 items.
-
CombineLatest — This operator is used when an item is emitted by either of two Observables, and the latest item emitted by each Observable is combined via a specified function and the resulting items are emitted based on the results of this function. The sample code demonstrates the use of
combineLatest()
operator. Let’s say there are 2 Observables each emitting values after an interval of 100 ms and 150 ms respectively. ThecombineLatest()
operator combines both the observables and emits the result at each particular intervals. -
Join — Whenever two items (each one for one source) are overlapped, they will be paired and sent to the resultSelector which computes and returns them. The join() operator takes the following items:
- right — the second Observable to join items from.
- leftDurationSelector — a function to select a duration for each item emitted by the source Observable, used to determine overlap.
- rightDurationSelector — a function to select a duration for each item emitted by the right Observable, used to determine overlap.
- resultSelector — a function that computes an item to be emitted by the resulting Observable for any two overlapping items emitted by the two Observables.
- The sample code demonstrates the use of
join()
operator. In the below sample, we create two Observables: left & right which emits a value every 100 milliseconds. We use thejoin()
operator to join the left Observable to the right Observable. The two integers emitted from both the Observables are added and the result is printed.
-
Merge — This operator combines multiple Observables into one by merging their emissions i.e. merges multiple Observables into a single Observable but it won’t maintain the sequential execution.
merge()
operator doesn’t wait for data from observable 1 to complete. It emits data from both the observable simultaneously as soon as the data becomes available to emit. Sample Code -
Concat — This operator combines the output of two or more Observables into a single Observable, without interleaving them i.e. the first Observables completes its emission before the second starts and so forth if there are more observables. Sample Code
Note: The difference between merge()
and concat()
is that merge()
interweaves output while concat()
waits for earlier emissions to complete before processing new emissions.
-
Zip — This operator combines the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function. Sample Code
-
SwitchOnNext — This operator emits items from the first Observable until the second Observable start emitting. Then, it unsubscribes from the first Observable and start emitting items from the second one. Sample Code
-
Delay — This operator shifts the emissions from an Observable forward in time by a particular amount. i.e. modifies its source Observable by pausing for a particular increment of time before emitting each of the items from the Obervable. The sample code emits all the Observable items after a delay of 2 seconds.
-
Do — This operator registers an action to take upon a variety of Observable lifecycle events.
- The
doOnNext()
operator modifies the Observable source so that it invokes an action when theonNext()
is called. - The
doOnCompleted()
operator registers an action so that it invokes an action when theonComplete()
is called. - The
doOnEach()
operator modifies the Observable source so that it notifies an Observer for each item and establishes a callback that will be called each time an item is emitted. - The
doOnSubscribe()
operator registers an action which is called whenever an Observer subscribes to the resulting Observable. - Sample Code
- The
-
Materialize/Dematerialize — This operator represents both the items emitted and the notification sent as emitted items, or vice versa. What
materialize()
does is basically wrap the observed object types into an observable Notification object on which we can check whether theonNext()
,onError()
and/oronComplete()
methods are called.dematerialize()
, as you might guess, reverses the effect. The sample code demonstrates the use ofmaterialize()
operator. From thematerialize()
operator, we can get the notification object. Using this object, we can check if the emitted item is:isOnNext()
orisOnError()
orisOnComplete()
. Here we can basically fetch items that are successful and omit items that resulted in error. -
ObserveOn — This operator specifies the scheduler on which an observer will observe this Observable. By default, an Observable along with the operator chain will operate on the same thread on which its Subscribe method is called. The
observeOn()
operator specifies a different Scheduler that the Observable will use for sending notifications to Observers. Sample Code -
SubscribeOn — This operator tells the source Observable which thread to use for emitting items to the Observer. Sample Code
-
TimeInterval — This operator converts an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions. i.e. if we are more interested in how much time has passed since the last item, rather than the absolute moment in time when the items were emitted, we can use the timeInterval() method. Sample Code
-
Timeout — This operator mirrors the source Observable, but issues an error notification if a particular period of time elapses without any emitted items. The sample code delays the emission of items by 1 second. But we have added a timeout that throws an exception if there is no emission within 500ms. Hence the below code will throw an error.
-
Timestamp — This operator attach a timestamp to each item emitted by an Observable. It transforms the items into the
Timestamped<T>
type, which contains the original items, along with a timestamp for when the event was emitted. Sample Code -
Using — This operator creates a disposable resource that has the same lifespan as the Observable. The
using()
operator is a way you can instruct an Observable to create a resource that exists only during the lifespan of the Observable and is disposed of when the Observable terminates. Sample Code
-
All — This operator determines whether all items emitted by an Observable meet some criteria. Sample Code
-
Amb — When you pass a number of source Observables to
amb()
, it will pass through the emissions and notifications of exactly one of these Observables: the first one that sends a notification to Amb, either by emitting an item or sending anonError
oronCompleted
notification. Amb will ignore and discard the emissions and notifications of all of the other source Observables. The sample code demonstrates the use ofamb()
operator. In the below example, theobservable2
gets executed because it is the first to emit its first item.observable1
is ignored. -
Contains — This operator determines whether an Observable emits a particular item or not. Sample Code
-
DefaultIfEmpty — This operator emits items from the source Observable, or a default item if the source Observable emits nothing. The sample code generates a random number and if the number is an even number, it is emitted, otherwise the Observable completes its emission and a default value we have specified (-10 in this case), is emitted.
-
SequenceEqual — This operator determines whether two Observables emit the same sequence of items. Sample Code
-
SkipUntil — This operator discards items emitted by an Observable until a second Observable emits an item. In the sample code, after emitting 3, 4, 5, the second Observable starts emitting items after three seconds.
-
SkipWhile — This operator will discard items emitted by an Observable until a specified condition becomes false. In the sample code, we have specified a list of integers from 0 to 6. All items are ignored until the given condition fails.
-
TakeUntil — This operator discards items emitted by an Observable until after a second Observable emits an item or terminates. It is the exact opposite of
skipUntil
. Sample Code -
TakeWhile — Thos operator will discard items emitted by an Observable after a specified condition becomes false. In the sample code, we have specified a list of integers from 0 to 6. All items are emitted until the given conditions fails.
Mathematical Observables require extra dependency. Add the following dependency inside the build.gradle file and sync gradle.
implementation 'com.github.akarnokd:rxjava2-extensions:0.20.0'
-
Average — This operator calculates the average of the items emitted by an Observable and emits this average. Sample Code
-
Count — This operator counts the number of items emitted by the source Observable and emit only this value. Sample Code
-
Max — This operator determines the maximum-valued item emitted by an Observable sequence and emits that item. Sample Code
-
Min — This operator determines the minimum-valued item emitted by an Observable sequence and emits that item. Sample Code
-
Reduce — This operator applies a function to each item emitted by an Observable, sequentially, and emit the final value. First it applies a function to first item, takes the result and feeds back to same function on second item. This process continuous until the last emission. Once all the items are over, it emits the final result. Sample Code
-
Sum — This operator calculates the sum of numbers emitted by an Observable and emits this sum. Sample Code
-
Single — Single is an Observable that always emit only one value or throws an error. A typical use case of Single observable would be when we make a network call in Android and receive a response. The sample code always emits a Single
user
object. We use a Single Observable and a Single Observer. The Single Observer always emits only once so there is noonNext()
. -
Maybe — Maybe is an Observable that may or may not emit a value. For example, we would like to know if a particular user exists in our db. The user may or may not exist. The sample code mimics a scenario where a
user
object is emitted. We use a Maybe Observable and a Maybe Observer. Again we are using the same scenario as above: creating a newUser
. -
Completable — Completable does not emit any data, but rather is focused on the status of execution — whether successful or failure. The sample code mimics a scenario where an existing
User
object is updated. Since no data is emitted in Completable, there is noonNext()
oronSuccess()
. This scenario can be used in cases where PUT api is called and we need to update an existing object to the backend. Sample Code -
Flowable — Flowable is typically used when an Observable is emitting huge amounts of data but the Observer is not able to handle this data emission. This is known as Back Pressure. The sample code provides a range of integers from 10 to 1000 and uses the
reduce()
operator to add the sum of the integers and emit the final sum value.
You can checkout a lot more about Flowable operators from here.
- A Subject extends an Observable and implements Observer at the same time. It acts as an Observable to clients and registers to multiple events taking place in the app. It acts as an Observer by broadcasting the event to multiple subscribers.
- Characteristics of Subjects:
- Subjects can act as both an Observer and an Observable. In the following example, we create an Observable which emits integers from 1 to 5. We create a subject, and use it to observe the changes to the Observable(In this scenario, the Subject is acting as an Observer). We will have two Observers to observe the changes in the Subject (In this scenario, the Subject is acting as an Observable). Example
- Subjects can multicast items to multiple child subscribers. Multicasting makes it possible to run expensive operations once and emit the results to multiple subscribers. This prevents doing duplicate operations for multiple subscribers. In the following example, we create an Observable which emits integers from 1 to 5. Each integer is squared by itself using the map() operator before it is emitted. We will have two Observers to observe the Observable. Example without Subjects Example using Subjects
- Subjects are considered as HOT Observables. A HOT Observable, such as Subjects, emits items only once regardless of number of subscribers and its subscribers receive items only from the point of their subscription. Subjects convert cold observable into hot observable. Example scenario: In the following example, we create a Subject which emits an integer from 1 to 4. We will add two Observers to observe the emission. Example. You will notice from the above output in the example that,
- Even though the Subject emits the integer value ‘0’, it is not printed. This is because there are no subscribers that are listening to the emission.
- Observer 2 only prints values ‘3’ and ‘4’. This is because the second Observer only subscribed to the Subject after it emitted values 0, 1 and 2.
- Types of Subjects:
- PublishSubject: PublishSubject emits all the items at the point of subscription. This is the most basic form of Subject. Example
- BehaviorSubject: BehaviorSubject emits the most recent item at the time of their subscription and all items after that. Example.
- Difference between PublishSubject and BehaviorSubject is that PublishSubject prints all values after subscription and BehaviorSubject prints the last emitted value before subscription and all the values after subscription.
- ReplaySubject: ReplaySubject emits all the items of the Observable, regardless of when the subscriber subscribes. Example.
- AsyncSubject: AsyncSubject emits only the last value of the Observable and this only happens after the Observable completes. Example
- UnicastSubject: UnicastSubject allows only a single subscriber and it emits all the items regardless of the time of subscription. Example.