-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Create 2 functions `subject()` and `holdSubject()` to cover all issues. Update the README with holdSubject() example Add API documentation
- Loading branch information
Showing
8 changed files
with
438 additions
and
278 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
const most = require('most'); | ||
const MulticastSource = require('most/lib/source/MulticastSource'); | ||
const PropagateTask = require('most/lib/scheduler/PropagateTask'); | ||
const CompoundDisposable = require('most/lib/disposable/dispose').all; | ||
const Stream = most.Stream; | ||
|
||
export function replay(stream, maxBufferSize) { | ||
if(stream.source instanceof ReplaySource && source.maxBufferSize !== maxBufferSize) { | ||
return stream; | ||
} | ||
return new Stream(new ReplaySource(stream.source, maxBufferSize)); | ||
} | ||
|
||
export function ReplaySource(source, maxBufferSize) { | ||
this._buffer = []; | ||
this._ended = false; | ||
this.maxBufferSize = maxBufferSize || Infinity; | ||
MulticastSource.call(this, source); | ||
} | ||
ReplaySource.prototype = Object.create(MulticastSource.prototype); | ||
|
||
ReplaySource.prototype._run = MulticastSource.prototype.run; | ||
ReplaySource.prototype.run = function(sink, scheduler) { | ||
const buffer = this._buffer; | ||
const self = this; | ||
this.sink = sink; | ||
|
||
if(this._ended) { | ||
return replay(); | ||
} | ||
if(buffer.length === 0) { | ||
return run(); | ||
} | ||
return new CompoundDisposable([replay(), run()]); | ||
|
||
function replay() { | ||
return new BufferProducer(buffer.slice(0), sink, scheduler); | ||
} | ||
|
||
function run() { | ||
return self._run(sink, scheduler); | ||
} | ||
}; | ||
|
||
ReplaySource.prototype._event = MulticastSource.prototype.event; | ||
ReplaySource.prototype.event = function ReplaySource_event(t, x) { | ||
this._addToBuffer({ type: 0, t, x }); | ||
this._event(t, x); | ||
}; | ||
|
||
MulticastSource.prototype._addToBuffer = function ReplaySource_addToBuffer(event) { | ||
if(this._buffer.length >= this.maxBufferSize) { | ||
this._buffer.shift(); | ||
} | ||
this._buffer.push(event); | ||
} | ||
|
||
MulticastSource.prototype.end = function(t, x, r) { | ||
MulticastSource | ||
var s = this.sinks; | ||
if(s.length === 1) { | ||
s[0].end(t, x); | ||
return; | ||
} | ||
for(var i=0; i<s.length; ++i) { | ||
if (i === s.length -1) { | ||
if (r) { | ||
break; // don't end underlying stream | ||
} | ||
} | ||
s[i].end(t, x); | ||
}; | ||
}; | ||
|
||
ReplaySource.prototype._end = MulticastSource.prototype.end; | ||
ReplaySource.prototype.end = function ReplaySource_end(t, x) { | ||
const self = this | ||
this._ended = true; | ||
this._addToBuffer({ type: 1, t, x }); | ||
this._end(t, x, this); | ||
this.add(this.sink); // add an extra sink so the last values can go through | ||
setTimeout(() => {self._end(t, x)}, 0)// dispose after values are propagated | ||
}; | ||
|
||
MulticastSource.prototype.error = function(t, e, r) { | ||
var s = this.sinks; | ||
if(s.length === 1) { | ||
s[0].error(t, e); | ||
return; | ||
} | ||
for (var i=0; i<s.length; ++i) { | ||
if (i === s.length - 1) { | ||
if (r) { | ||
break; // don't end underlying stream | ||
} | ||
} | ||
s[i].error(t, e); | ||
}; | ||
}; | ||
|
||
ReplaySource.prototype._error = MulticastSource.prototype.error | ||
ReplaySource.prototype.error = function ReplaySink_error(t, e) { | ||
const self = this | ||
this._ended = true; | ||
this._buffer.push({ type: 2, t, x: e }); | ||
this._error(t, e, this); | ||
this.add(this.sink) | ||
setTimeout(() => {self._error(t, e)}, 0) | ||
}; | ||
|
||
function BufferProducer(buffer, sink, scheduler) { | ||
this.task = new PropagateTask(runProducer, buffer, sink); | ||
scheduler.asap(this.task); | ||
} | ||
|
||
BufferProducer.prototype.dispose = function() { | ||
return this.task.dispose(); | ||
}; | ||
|
||
function runProducer(t, buffer, sink) { | ||
const emit = item => { | ||
sink.event(item.t, item.x); | ||
} | ||
for(var i=0, j=buffer.length; i<j && this.active; i++) { | ||
const item = buffer[i]; | ||
switch(item.type) { | ||
case 0: emit(item); break; | ||
case 1: return this.active && sink.end(item.t, item.x); | ||
case 2: return this.active && sink.error(item.t, item.x); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
class Subscription { | ||
constructor() { | ||
this.run = (sink, scheduler) => this._run(sink, scheduler) | ||
this.add = this.next = x => this._add(x) | ||
this.error = err => this._error(err) | ||
this.end = this.complete = x => this._end(x) | ||
} | ||
|
||
_run(sink, scheduler) { | ||
this.sink = sink | ||
this.scheduler = scheduler | ||
this.active = true | ||
return this | ||
} | ||
|
||
dispose() { | ||
this.active = false | ||
} | ||
|
||
_add(x) { | ||
if (!this.active) { | ||
return | ||
} | ||
tryEvent(this.sink, this.scheduler, x) | ||
} | ||
|
||
_error(e) { | ||
this.active = false | ||
this.sink.error(this.scheduler.now(), e) | ||
} | ||
|
||
_end(x) { | ||
if (!this.active) { | ||
return | ||
} | ||
this.active = false | ||
tryEnd(this.sink, this.scheduler, x) | ||
} | ||
} | ||
|
||
function tryEvent(sink, scheduler, event) { | ||
try { | ||
sink.event(scheduler.now(), event) | ||
} catch(e) { | ||
sink.error(scheduler.now(), e) | ||
} | ||
} | ||
|
||
function tryEnd(sink, scheduler, event) { | ||
try { | ||
sink.end(scheduler.now(), event) | ||
} catch (e) { | ||
sink.error(scheduler.now(), e) | ||
} | ||
} | ||
|
||
export {Subscription} |
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,41 @@ | ||
import subject from './subject' | ||
import holdSubject from './holdSubject' | ||
import behaviorSubject from './behaviorSubject' | ||
import {Stream} from 'most' | ||
import MulticastSource from 'most/lib/source/MulticastSource' | ||
import {Subscription} from './Subscription' | ||
import {replay as replayStream} from './ReplaySource' | ||
import hold from '@most/hold' | ||
|
||
export {subject, holdSubject, behaviorSubject} | ||
const defaults = { | ||
replay: false, | ||
bufferSize: 1 | ||
} | ||
|
||
function create(replay, bufferSize, initialValue) { | ||
const sink = new Subscription() | ||
let stream; | ||
|
||
if (!replay) { | ||
stream = new Stream(new MulticastSource(sink)) | ||
} else { | ||
stream = bufferSize === 1 ? | ||
hold(new Stream(sink)) : | ||
replayStream(new Stream(sink), bufferSize) | ||
} | ||
|
||
stream.drain() | ||
|
||
if (typeof initialValue !== 'undefined') { | ||
sink.next(initialValue) | ||
} | ||
|
||
return {sink, stream, observer: sink} | ||
} | ||
|
||
function subject(initialValue) { | ||
return create(false, 1, initialValue) | ||
} | ||
|
||
function holdSubject(bufferSize = 1, initialValue) { | ||
return create(true, bufferSize, initialValue) | ||
} | ||
|
||
export {subject, holdSubject} |
Oops, something went wrong.