-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Update/Add dependencies on most and @most/hold Add .next() .complete() methods to sink Make stream 'hot' Implement holdSubject Implement behaviorSubject Add tests for all of the above Closes #1 #2 #3 #4
- Loading branch information
Showing
6 changed files
with
229 additions
and
90 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
import {Stream} from 'most' | ||
import MulticastSource from 'most/lib/source/MulticastSource' | ||
import hold from '@most/hold' | ||
|
||
import {Subscription} from './subject' | ||
|
||
class BehaviorSubscription extends Subscription { | ||
constructor(intialValue) { | ||
super() | ||
|
||
this.run = (sink, scheduler) => this._run(sink, scheduler) | ||
this.add = x => this._add(x) | ||
this.error = err => this._error(err) | ||
this.end = x => this._end(x) | ||
this.value = void 0 | ||
|
||
if (typeof initialValue !== 'undefined') { | ||
this.value = initialValue | ||
this.add(x) | ||
} | ||
} | ||
|
||
_add(x) { | ||
this.value = x | ||
super._add(x) | ||
} | ||
} | ||
|
||
function create(initialValue) { | ||
const sink = new BehaviorSubscription(initialValue) | ||
const stream = new Stream(new MulticastSource(sink)) | ||
const holdStream = hold(stream) | ||
holdStream.observe(x => { | ||
stream.value = x | ||
}) | ||
return {sink, stream: holdStream} | ||
} | ||
|
||
export default create |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
import subject from './subject' | ||
import hold from '@most/hold' | ||
|
||
export default function holdSubject() { | ||
const {sink, stream} = subject() | ||
const holdStream = hold(stream) | ||
return {sink, stream: holdStream} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
import Subject from './subject' | ||
import subject from './subject' | ||
import holdSubject from './holdSubject' | ||
import behaviorSubject from './behaviorSubject' | ||
|
||
export default Subject | ||
export {subject, holdSubject, behaviorSubject} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,112 +1,195 @@ | ||
import assert from 'assert' | ||
import {Stream} from 'most' | ||
import hold from '@most/hold' | ||
import Subject from '../src' | ||
|
||
describe('Subject', () => { | ||
it('should return Object with stream and sink', done => { | ||
const s = Subject() | ||
assert.strictEqual(typeof s, 'object') | ||
assert.strictEqual(typeof s.stream, 'object') | ||
assert.strictEqual(typeof s.sink, 'object') | ||
done() | ||
}) | ||
import * as Subjects from '../src' | ||
|
||
Object.keys(Subjects) | ||
.forEach(subject => { | ||
describe(`${subject}`, () => { | ||
it('should return Object with stream and sink', done => { | ||
const s = Subjects[subject]() | ||
assert.strictEqual(typeof s, 'object') | ||
assert.strictEqual(typeof s.stream, 'object') | ||
assert.strictEqual(typeof s.sink, 'object') | ||
done() | ||
}) | ||
|
||
describe('stream', () => { | ||
it('should be an extension of Stream', done => { | ||
const {stream} = Subject() | ||
assert.strictEqual(stream instanceof Stream, true) | ||
done() | ||
}) | ||
describe('stream', () => { | ||
it('should be an extension of Stream', done => { | ||
const {stream} = Subjects[subject]() | ||
assert.strictEqual(stream instanceof Stream, true) | ||
done() | ||
}) | ||
|
||
it('should be hold-able', done => { | ||
const {sink, stream} = Subjects[subject]() | ||
const hstream = hold(stream) | ||
|
||
hstream | ||
.forEach(x => { | ||
assert.strictEqual(x, 1) | ||
}) | ||
|
||
it('should be hold-able', done => { | ||
const {sink, stream} = Subject() | ||
const hstream = hold(stream) | ||
setTimeout(() => { | ||
hstream.forEach(x => { | ||
assert.strictEqual(x, 1) | ||
done() | ||
}) | ||
}, 10) | ||
|
||
hstream | ||
.forEach(x => { | ||
assert.strictEqual(x, 1) | ||
sink.add(1) | ||
}) | ||
|
||
setTimeout(() => { | ||
hstream.forEach(x => { | ||
assert.strictEqual(x, 1) | ||
done() | ||
it('should inherit Stream combinators', done => { | ||
const {sink, stream} = Subjects[subject]() | ||
|
||
stream | ||
.map(x => x * x) | ||
.forEach(x => { | ||
assert.strictEqual(x, 25) | ||
done() | ||
}) | ||
|
||
sink.add(5) | ||
sink.end() | ||
}) | ||
}, 10) | ||
}) | ||
|
||
sink.add(1) | ||
}) | ||
describe('sink', () => { | ||
it('should allow nexting events', done => { | ||
const {sink, stream} = Subjects[subject]() | ||
|
||
it('should inherit Stream combinators', done => { | ||
const {sink, stream} = Subject() | ||
assert.strictEqual(typeof sink.add, 'function') | ||
|
||
stream | ||
.map(x => x * x) | ||
.forEach(x => { | ||
assert.strictEqual(x, 25) | ||
done() | ||
stream.forEach(x => { | ||
assert.strictEqual(x, 1) | ||
done() | ||
}) | ||
|
||
sink.add(1) | ||
sink.end() | ||
}) | ||
|
||
sink.add(5) | ||
sink.end() | ||
}) | ||
}) | ||
it('should allow sending errors' , done => { | ||
const {sink, stream} = Subjects[subject]() | ||
|
||
assert.strictEqual(typeof sink.error, 'function') | ||
stream | ||
.drain() | ||
.then(assert.fail) | ||
.catch(err => { | ||
assert.strictEqual(err.message, 'Error Message') | ||
done() | ||
}) | ||
|
||
sink.add(1) | ||
sink.add(2) | ||
sink.error(new Error('Error Message')) | ||
}) | ||
|
||
describe('sink', () => { | ||
it('should allow nexting events', done => { | ||
const {sink, stream} = Subject() | ||
it('should allow ending of stream', done => { | ||
const {sink, stream} = Subjects[subject]() | ||
|
||
assert.strictEqual(typeof sink.add, 'function') | ||
stream | ||
.forEach(assert.fail) | ||
.then(done) | ||
.catch(assert.fail) | ||
|
||
stream.forEach(x => { | ||
assert.strictEqual(x, 1) | ||
done() | ||
}) | ||
sink.end() | ||
}) | ||
|
||
sink.add(1) | ||
sink.end() | ||
}) | ||
it('should not allow events after end', done => { | ||
const {sink, stream} = Subjects[subject]() | ||
|
||
it('should allow sending errors' , done => { | ||
const {sink, stream} = Subject() | ||
const now = () => setTimeout(done, 10) | ||
stream | ||
.forEach(assert.fail) | ||
.then(now) | ||
.catch(assert.fail) | ||
|
||
assert.strictEqual(typeof sink.error, 'function') | ||
stream | ||
.drain() | ||
.then(assert.fail) | ||
.catch(err => { | ||
assert.strictEqual(err.message, 'Error Message') | ||
done() | ||
sink.end() | ||
sink.add(1) | ||
}) | ||
|
||
sink.add(1) | ||
sink.add(2) | ||
sink.error(new Error('Error Message')) | ||
}) | ||
it('sink should have es7 observer methods', done => { | ||
const {sink, stream} = Subjects[subject]() | ||
|
||
it('should allow ending of stream', done => { | ||
const {sink, stream} = Subject() | ||
assert.strictEqual(typeof sink.add, 'function') | ||
|
||
stream | ||
.forEach(assert.fail) | ||
.then(done) | ||
.catch(assert.fail) | ||
stream.forEach(x => { | ||
assert.strictEqual(x, 1) | ||
}).then(done) | ||
|
||
sink.end() | ||
}) | ||
sink.next(1) | ||
sink.complete() | ||
}) | ||
}) | ||
|
||
if (subject === 'subject') { | ||
it('should be `hot` by default', done => { | ||
const {sink, stream} = Subjects[subject]() | ||
|
||
it('should not allow events after end', done => { | ||
const {sink, stream} = Subject() | ||
sink.add(2) | ||
sink.add(3) | ||
|
||
const now = () => setTimeout(done, 10) | ||
stream | ||
.forEach(assert.fail) | ||
.then(now) | ||
.catch(assert.fail) | ||
stream | ||
.observe(x => { | ||
assert.strictEqual(x, 1) | ||
}) | ||
.then(done) | ||
.catch(assert.fail) | ||
|
||
sink.end() | ||
sink.add(1) | ||
sink.add(1) | ||
sink.end() | ||
}) | ||
} | ||
|
||
if(subject === 'holdSubject') { | ||
describe('holdSubject', () => { | ||
it('should be held by default', done => { | ||
const {sink, stream} = Subjects[subject]() | ||
|
||
stream | ||
.forEach(x => { | ||
assert.strictEqual(x, 1) | ||
}) | ||
|
||
setTimeout(() => { | ||
stream.forEach(x => { | ||
assert.strictEqual(x, 1) | ||
done() | ||
}) | ||
}, 10) | ||
|
||
sink.add(1) | ||
}) | ||
}) | ||
} | ||
|
||
if (subject === 'behaviorSubject') { | ||
describe('behaviorSubject', () => { | ||
it('should allow for a default value', done => { | ||
const {sink, stream} = Subjects[subject](123) | ||
|
||
stream.forEach(x => { | ||
assert.strictEqual(x, 123) | ||
}).then(done) | ||
|
||
sink.end() | ||
}) | ||
|
||
it('should allow finding latest value from `.value`', done => { | ||
const {sink, stream} = Subjects[subject](123) | ||
|
||
stream.forEach(x => { | ||
assert.strictEqual(x, sink.value) | ||
}).then(done) | ||
|
||
sink.add(1) | ||
sink.add(2) | ||
sink.end() | ||
}) | ||
}) | ||
} | ||
}) | ||
}) | ||
}) |