This code adds the melt()
method to Microsoft's RxJS library. melt()
converts a cold observable to a hot observable. It is somewhat similar to the publish()
method, already included in RxJS. The difference is that melt()
can automatically subscribe and unsubscribe to the source as needed and do so correctly. publish()
solves a similar challenge
by using an extra connect()
method, to avoid some tricky situations.
melt()
is not a wrap-around of publish()
, neither does it rely on publish()
in any way.
Note: publish()
and connect()
work together well. This piece of code was made just out of interest, on a fine Friday evening.
- Cold vs Hot Observables
- Why Convert Cold to Hot
- Acquisition and usage
- Examples
- Difference between publish and melt
Hot observables exist independently from their subscriptions. When an observer subscribes to a hot source, the observer will miss all values emitted before the subscription took place. And if another observer subscribes later, from that moment both will continue to receive the same values (almost) simultaneously.
// Observable emits values: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, ...
// Observer 1 subscribes: 4, 5, 6, 7, 8, 9, 10, 11, ...
// Observer 2 subscribes: 6, 7, 8, 9, 10, 11, ...
Cold observables, on the other hand, do not emit values until subscribed. They are 'created' afresh for each subscription, which means they will always re-start from the same initial value (with some caveats). Late subscribers do not miss any values.
// Observable created: - no values. 're-created' on each subscription.
// Observer 1 subscribes: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...
// Observer 2 subscribes: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...
Hot observables are usually bound to 'natural' events, like mouse clicks. For example:
Rx.Observable.fromEvent(button,'click'); // Emits click events at every click
Cold observables are 'artificially' created and not bound to 'natural' events. For example:
Rx.Observable.of(1,2,3); // Emit three values: 1,2 and 3
Rx.Observable.interval(1000); // Emit incremental values each second
(Read more)
Even in the world of observables it is sometimes desirable to heat things up a bit. For example, if we want to implement a ticker,
which should get data from a remote server once a minute. If five observers were to subscribe to the underlying observable,
we would not want the remote server to be queried five times a minute. Yet this is precisely what would happen if the
observable is implemented, say, with Observable.create()
without deliberately 'protecting' the source from being
multiple subscriptions.
The publish()
method, and of course now the melt()
method, represent that deliberate 'protection' of the source.
They both return a surrogate observable, which itself subscribes to the underlying source only once, and then propagates events
down to its own subscribers.
The following assumes you have node.js
installed with npm
package manager.
git clone https://github.com/levanroinishvili/observable-melt.git observable-melt
cd observable-melt
npm install
// Then, inside a js file
const Rx = require('./index'); // With correct path to index.js file
Rx.Observable
.interval(1000)
.melt(startImediate,persistent);
- startImediate : if true, immediately subscribe to the source observable. Otherwise,
subscribe to source as necessary. Defaults to
false
. - persistent : if true, do not automatically unsubscribe from source, when all children unsubscribe.
Defaults to
false
.
For example:
let source = Rx.Observable.interval(1000); // Emit values every second: 0,1,2,3,...
source.take(10).subscribe(console.log); // Will start outputting 0,1,2,...9
// in a few seconds, subscribe again
source.take(10).subscribe(console.log); // Will start outputting values from 0
This is not always desirable. For example, consider a ticker which should get values from a server
once a second. If we subscribe three times to the ticker, we do not want it to activate three times a second.
This issue can be solved by the publish()
method, included in the RxJS. And, it can also be solved with
my melt()
method
In addition to sample use below, demo.js
can be run (e.g. using node.js
) for a small demonstration.
let source = Rx.Observable
.interval(1000) // Emit values every second
.melt(); // Convert cold observable to hot
// Create first subscription
let subscription1 = source.subscribe(val=>{console.log('Observer 1 received',val);});
// Starts outputting 0,1,2,3,...
// After a few seconds create a second subscription
let subscription2 = source.subscribe(val=>{console.log('Observer 2 received',val);});
// Starts outputting in sync with the first observable:
// 0,1,2,3,4,5,6,7,8,9,...
// 6,7,8,9,...
subscription1.unsubscribe();
// Second subscription keeps outputting values
subscription2.unsubscribe();
Consider these two, slightly different, uses of publish()
:
// ------------------------------ Use one:
let source = Rx.Observable
.interval(1000) // Emit values every second
.publish(); // convert to hot observable using publish()
let subscription = source.subscribe(console.log); // Subscribe with a very simple observable
// ------------------------------ Use two:
let subscription = Rx.Observable
.of(0,1,2,3,4,5)
.publish()
.subscribe(console.log);
If publish()
tried to automatically subscribe to the source observable, the second use would be trickier to implement internally.
As soon as publish()
subscribed to the source observable .of(0,1,2,3,4,5)
,
it would immediately and synchronously emit all values and complete. Only then the method .subscribe()
would fire, but not observe any values. To solve this problem, publish()
does not automatically subscribe to the source observable.
Instead, the source is activated by a separate call to connect()
.
By separating subscription to the source from own subscriptions, the tricky situation is avoided.
melt()
will handle such cases correctly. That is, subscribed observers will receive all expected values.