Skip to content

Commit

Permalink
feat(): reimplement replay and remove dep on @most/hold
Browse files Browse the repository at this point in the history
Adjust tests
Reimplement replay to be more specific to most-subject
Remove sink in favor of observer as that's the ES Observable spec definition
Add ESLint
Update all dev dependencies
Update README
  • Loading branch information
TylorS committed Feb 28, 2016
1 parent 2e10ca4 commit 620b2c4
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 298 deletions.
12 changes: 12 additions & 0 deletions .eslintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"parser": "babel-eslint",
"extends": "eslint-config-cycle",
"env": {
"browser": true,
"node": true
},
"rules": {
"no-class/no-class": 0,
"quotes": 0
}
}
88 changes: 44 additions & 44 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,79 +2,79 @@

Subject and Subject-like interface to Most.js

# Usage
# API

```js
import {subject} from 'most-subject'
## holdSubject(bufferSize = 1[, initialValue])

const {sink, stream} = subject(1) // starts with initial value of 1
[src/index.js:25-31](https://github.com/tylors/most-subject/tree/master/src/index.js#L25-L31 "Source code on GitHub")

stream.forEach(x => console.log(x)) // 1, 2
Creates a subject that replays past events that a new observer may have missed.

sink.add(2) // Pushes 2 to stream
sink.error(new Error('Error Message')) // Send an error
sink.end() // End the stream
```
**Parameters**

- `bufferSize` **integer** [= 1] - how many values to keep buffered.
Must be an integer 1 or greater.
- `initialValue` **any** an initialValue to start with

```js
**Examples**

```javascript
import {holdSubject} from 'most-subject'

// create subject with buffersize of 4
// and an initial value of 1
const {observer, stream} = holdSubject(4, 1) // observer is an alias for sink
// will keep 4 items buffered with an initialValue of 1
const {observer, stream} = holdSubject(4, 1)

observer.next(2) // next is an alias for add()
observer.next(2)
observer.next(3)
observer.next(4)

stream.observe(x => console.log(x)) // 1, 2, 3, 4
stream.observe(x => console.log(x)) // 1 , 2 , 3, 4 , 5

observer.complete() // alias for end()
observer.next(5)
observer.complete()
```

Returns [**Subject**](#Subject)

## API
## subject()

#### **subject( [initialValue] )**
[src/index.js:21-23](https://github.com/tylors/most-subject/tree/master/src/index.js#L21-L23 "Source code on GitHub")

```js
import {subject} from 'most-subject'
```
Creates a basic Subject

**Arguments**
**Examples**

- initialValue (optional) :: any - A value for the stream to start with
```javascript
import {subject} from 'most-subject'

**Returns**
const {observer, stream} = subject()

- sink :: [Sink](#sink) - A sink to imperatively push to a stream
- observer :: [Sink](#sink) - An alias to `sink` to more closely align with ES Observable specification.
- stream :: most.Stream - The stream the sink/observer pushes to.
stream.observe(x => console.log(x)) // 1 , 2 , 3

#### **holdSubject(bufferSize = 1 [, initialValue])**
```js
import {holdSubject} from 'most-subject'
observer.next(1)
observer.next(2)
observer.next(3)
observer.complete()
```

**Arguments**
Returns [**Subject**](#Subject)

- bufferSize (defaults to 1) :: Number - Size of the buffer which will store past values. These values will be replayed upon observation.
## Subject

- initialValue (optional) :: any - A value for the stream to start with
A Subject is simply an object with the following properties

**Returns**
**Properties**

- sink :: [Sink](#sink) - A sink to imperatively push to a stream
- observer :: [Sink](#sink) - An alias to `sink` to more closely align with ES Observable specification.
- stream :: most.Stream - The stream the sink/observer pushes to.
- `observer` [**Observer**](#Observer)
- `stream` **most.Stream** A most.js Stream instance

## Observer

#### Sink
An Observer

**Methods**
**Properties**

- *add(value: any): void* - pushes a value to a sink's associated stream
- *next(value: any): void* - alias for `add()`
- *error(error: Error): void* - throws an error on a sink's associated stream and also ends the stream.
- *end(value: any): void* - ends a sinks' associated stream with the specified end value
- *complete(value: any): void* - alias for `end()`
- `next` **Function<any>** pushes a new value to the underlying Stream
- `error` **Function<Error>** pushes a new Error to and ends
the underlying Stream
- `complete` **Function<Any>** ends the underlying Stream
23 changes: 13 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
"version": "2.1.0",
"description": "Subject implemented in Most",
"main": "lib/index.js",
"typings": "type-definitions/most-subject.d.ts",
"scripts": {
"lint": "eslint src/",
"mocha": "mocha --compilers js:babel-core/register",
"compile-lib": "babel src/ -d lib/",
"test": "npm run mocha",
"test": "npm run lint && npm run mocha",
"prepublish": "npm test && npm run compile-lib"
},
"repository": {
Expand All @@ -26,16 +26,19 @@
},
"homepage": "https://github.com/tylors/most-subject#readme",
"devDependencies": {
"@most/hold": "^1.1.0",
"assert": "^1.3.0",
"babel-cli": "^6.4.5",
"babel-core": "^6.4.5",
"babel-preset-es2015": "^6.3.13",
"mocha": "^2.3.4",
"most": "0.18.0"
"babel-cli": "^6.5.1",
"babel-core": "^6.5.2",
"babel-eslint": "^5.0.0",
"babel-preset-es2015": "^6.5.0",
"eslint": "^1.10.3",
"eslint-config-cycle": "^3.2.0",
"eslint-plugin-cycle": "^1.0.2",
"eslint-plugin-no-class": "^0.1.0",
"mocha": "^2.4.5",
"most": "^0.18.1"
},
"peerDependencies": {
"most": "*",
"@most/hold": "*"
"most": "*"
}
}
52 changes: 26 additions & 26 deletions src/Subscription.js → src/Observer.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
class Subscription {
function tryEvent(sink, scheduler, event) {
try {
sink.event(scheduler.now(), event)
} catch (err) {
sink.error(scheduler.now(), err)
}
}

function tryEnd(sink, scheduler, event) {
try {
sink.end(scheduler.now(), event)
} catch (err) {
sink.error(scheduler.now(), err)
}
}

class Observer {
constructor() {
this.run = (sink, scheduler) => this._run(sink, scheduler)
this.add = this.next = x => this._add(x)
this.next = x => this._next(x)
this.error = err => this._error(err)
this.end = this.complete = x => this._end(x)
this.complete = x => this._complete(x)
}

_run(sink, scheduler) {
Expand All @@ -17,41 +33,25 @@ class Subscription {
this.active = false
}

_add(x) {
_next(value) {
if (!this.active) {
return
}
tryEvent(this.sink, this.scheduler, x)
tryEvent(this.sink, this.scheduler, value)
}

_error(e) {
_error(err) {
this.active = false
this.sink.error(this.scheduler.now(), e)
this.sink.error(this.scheduler.now(), err)
}

_end(x) {
_complete(value) {
if (!this.active) {
return
}
this.active = false
tryEnd(this.sink, this.scheduler, x)
}
}

function tryEvent(sink, scheduler, event) {
try {
sink.event(scheduler.now(), event)
} catch(e) {
sink.error(scheduler.now(), e)
}
}

function tryEnd(sink, scheduler, event) {
try {
sink.end(scheduler.now(), event)
} catch (e) {
sink.error(scheduler.now(), e)
tryEnd(this.sink, this.scheduler, value)
}
}

export {Subscription}
export {Observer}
58 changes: 58 additions & 0 deletions src/Replay.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import {Stream} from 'most'
import MulticastSource from 'most/lib/source/MulticastSource'

function pushEvents(sink, buffer) {
let i = 0
for (; i < buffer.length; ++i) {
let item = buffer[i]
sink.event(item.time, item.value)
}
}

function replayAdd(sink) {
const length = this._replayAdd(sink)
if (this._replay.buffer.length > 0) {
pushEvents(sink, this._replay.buffer)
}
return length
}

function addToBuffer(event, replay) {
if (replay.buffer.length >= replay.bufferSize) {
replay.buffer.shift()
}
replay.buffer.push(event)
}

function replayEvent(time, value) {
if (this._replay.bufferSize > 0) {
addToBuffer({time, value}, this._replay)
}
this._replayEvent(time, value)
}

class Replay {
constructor(bufferSize, source) {
this.source = source
this.bufferSize = bufferSize
this.buffer = []
}

run(sink, scheduler) {
if (sink._replay !== this) {
sink._replay = this
sink._replayAdd = sink.add
sink.add = replayAdd

sink._replayEvent = sink.event
sink.event = replayEvent
}

return this.source.run(sink, scheduler)
}
}

const replay = (bufferSize, stream) =>
new Stream(new MulticastSource(new Replay(bufferSize, stream.source)))

export {replay}
Loading

0 comments on commit 620b2c4

Please sign in to comment.