Skip to content

Commit

Permalink
feat(): close all issues
Browse files Browse the repository at this point in the history
Update/Add dependencies on most and @most/hold
Add .next() .complete() methods to sink
Make stream 'hot'
Implement holdSubject
Implement behaviorSubject
Add tests for all of the above

Closes #1 #2 #3 #4
  • Loading branch information
TylorS committed Jan 19, 2016
1 parent a661017 commit 1842e85
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 90 deletions.
8 changes: 5 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@
},
"homepage": "https://github.com/tylors/most-subject#readme",
"devDependencies": {
"@most/hold": "^0.2.0",
"@most/hold": "^1.0.0",
"assert": "^1.3.0",
"babel-cli": "^6.2.4",
"babel-core": "^6.2.4",
"babel-preset-es2015": "^6.2.4",
"mocha": "^2.3.4"
"mocha": "^2.3.4",
"most": "0.17.1"
},
"peerDependencies": {
"most": "*"
"most": "*",
"@most/hold": "*"
}
}
39 changes: 39 additions & 0 deletions src/behaviorSubject.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import {Stream} from 'most'
import MulticastSource from 'most/lib/source/MulticastSource'
import hold from '@most/hold'

import {Subscription} from './subject'

class BehaviorSubscription extends Subscription {
constructor(intialValue) {
super()

this.run = (sink, scheduler) => this._run(sink, scheduler)
this.add = x => this._add(x)
this.error = err => this._error(err)
this.end = x => this._end(x)
this.value = void 0

if (typeof initialValue !== 'undefined') {
this.value = initialValue
this.add(x)
}
}

_add(x) {
this.value = x
super._add(x)
}
}

function create(initialValue) {
const sink = new BehaviorSubscription(initialValue)
const stream = new Stream(new MulticastSource(sink))
const holdStream = hold(stream)
holdStream.observe(x => {
stream.value = x
})
return {sink, stream: holdStream}
}

export default create
8 changes: 8 additions & 0 deletions src/holdSubject.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import subject from './subject'
import hold from '@most/hold'

export default function holdSubject() {
const {sink, stream} = subject()
const holdStream = hold(stream)
return {sink, stream: holdStream}
}
6 changes: 4 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import Subject from './subject'
import subject from './subject'
import holdSubject from './holdSubject'
import behaviorSubject from './behaviorSubject'

export default Subject
export {subject, holdSubject, behaviorSubject}
11 changes: 8 additions & 3 deletions src/subject.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import MulticastSource from 'most/lib/source/MulticastSource'

function Subscription() {
this.run = (sink, scheduler) => this._run(sink, scheduler)
this.add = x => this._add(x)
this.add = this.next = x => this._add(x)
this.error = err => this._error(err)
this.end = x => this._end(x)
this.end = this.complete = x => this._end(x)
}

Subscription.prototype._run = function run(sink, scheduler) {
Expand Down Expand Up @@ -58,7 +58,12 @@ Subscription.prototype._end = function end(x) {
function create() {
const sink = new Subscription()
const stream = new Stream(new MulticastSource(sink))
return {sink, stream}
stream.drain()
return {
sink,
stream,
}
}

export {Subscription}
export default create
247 changes: 165 additions & 82 deletions test/index.js
Original file line number Diff line number Diff line change
@@ -1,112 +1,195 @@
import assert from 'assert'
import {Stream} from 'most'
import hold from '@most/hold'
import Subject from '../src'

describe('Subject', () => {
it('should return Object with stream and sink', done => {
const s = Subject()
assert.strictEqual(typeof s, 'object')
assert.strictEqual(typeof s.stream, 'object')
assert.strictEqual(typeof s.sink, 'object')
done()
})
import * as Subjects from '../src'

Object.keys(Subjects)
.forEach(subject => {
describe(`${subject}`, () => {
it('should return Object with stream and sink', done => {
const s = Subjects[subject]()
assert.strictEqual(typeof s, 'object')
assert.strictEqual(typeof s.stream, 'object')
assert.strictEqual(typeof s.sink, 'object')
done()
})

describe('stream', () => {
it('should be an extension of Stream', done => {
const {stream} = Subject()
assert.strictEqual(stream instanceof Stream, true)
done()
})
describe('stream', () => {
it('should be an extension of Stream', done => {
const {stream} = Subjects[subject]()
assert.strictEqual(stream instanceof Stream, true)
done()
})

it('should be hold-able', done => {
const {sink, stream} = Subjects[subject]()
const hstream = hold(stream)

hstream
.forEach(x => {
assert.strictEqual(x, 1)
})

it('should be hold-able', done => {
const {sink, stream} = Subject()
const hstream = hold(stream)
setTimeout(() => {
hstream.forEach(x => {
assert.strictEqual(x, 1)
done()
})
}, 10)

hstream
.forEach(x => {
assert.strictEqual(x, 1)
sink.add(1)
})

setTimeout(() => {
hstream.forEach(x => {
assert.strictEqual(x, 1)
done()
it('should inherit Stream combinators', done => {
const {sink, stream} = Subjects[subject]()

stream
.map(x => x * x)
.forEach(x => {
assert.strictEqual(x, 25)
done()
})

sink.add(5)
sink.end()
})
}, 10)
})

sink.add(1)
})
describe('sink', () => {
it('should allow nexting events', done => {
const {sink, stream} = Subjects[subject]()

it('should inherit Stream combinators', done => {
const {sink, stream} = Subject()
assert.strictEqual(typeof sink.add, 'function')

stream
.map(x => x * x)
.forEach(x => {
assert.strictEqual(x, 25)
done()
stream.forEach(x => {
assert.strictEqual(x, 1)
done()
})

sink.add(1)
sink.end()
})

sink.add(5)
sink.end()
})
})
it('should allow sending errors' , done => {
const {sink, stream} = Subjects[subject]()

assert.strictEqual(typeof sink.error, 'function')
stream
.drain()
.then(assert.fail)
.catch(err => {
assert.strictEqual(err.message, 'Error Message')
done()
})

sink.add(1)
sink.add(2)
sink.error(new Error('Error Message'))
})

describe('sink', () => {
it('should allow nexting events', done => {
const {sink, stream} = Subject()
it('should allow ending of stream', done => {
const {sink, stream} = Subjects[subject]()

assert.strictEqual(typeof sink.add, 'function')
stream
.forEach(assert.fail)
.then(done)
.catch(assert.fail)

stream.forEach(x => {
assert.strictEqual(x, 1)
done()
})
sink.end()
})

sink.add(1)
sink.end()
})
it('should not allow events after end', done => {
const {sink, stream} = Subjects[subject]()

it('should allow sending errors' , done => {
const {sink, stream} = Subject()
const now = () => setTimeout(done, 10)
stream
.forEach(assert.fail)
.then(now)
.catch(assert.fail)

assert.strictEqual(typeof sink.error, 'function')
stream
.drain()
.then(assert.fail)
.catch(err => {
assert.strictEqual(err.message, 'Error Message')
done()
sink.end()
sink.add(1)
})

sink.add(1)
sink.add(2)
sink.error(new Error('Error Message'))
})
it('sink should have es7 observer methods', done => {
const {sink, stream} = Subjects[subject]()

it('should allow ending of stream', done => {
const {sink, stream} = Subject()
assert.strictEqual(typeof sink.add, 'function')

stream
.forEach(assert.fail)
.then(done)
.catch(assert.fail)
stream.forEach(x => {
assert.strictEqual(x, 1)
}).then(done)

sink.end()
})
sink.next(1)
sink.complete()
})
})

if (subject === 'subject') {
it('should be `hot` by default', done => {
const {sink, stream} = Subjects[subject]()

it('should not allow events after end', done => {
const {sink, stream} = Subject()
sink.add(2)
sink.add(3)

const now = () => setTimeout(done, 10)
stream
.forEach(assert.fail)
.then(now)
.catch(assert.fail)
stream
.observe(x => {
assert.strictEqual(x, 1)
})
.then(done)
.catch(assert.fail)

sink.end()
sink.add(1)
sink.add(1)
sink.end()
})
}

if(subject === 'holdSubject') {
describe('holdSubject', () => {
it('should be held by default', done => {
const {sink, stream} = Subjects[subject]()

stream
.forEach(x => {
assert.strictEqual(x, 1)
})

setTimeout(() => {
stream.forEach(x => {
assert.strictEqual(x, 1)
done()
})
}, 10)

sink.add(1)
})
})
}

if (subject === 'behaviorSubject') {
describe('behaviorSubject', () => {
it('should allow for a default value', done => {
const {sink, stream} = Subjects[subject](123)

stream.forEach(x => {
assert.strictEqual(x, 123)
}).then(done)

sink.end()
})

it('should allow finding latest value from `.value`', done => {
const {sink, stream} = Subjects[subject](123)

stream.forEach(x => {
assert.strictEqual(x, sink.value)
}).then(done)

sink.add(1)
sink.add(2)
sink.end()
})
})
}
})
})
})

0 comments on commit 1842e85

Please sign in to comment.