diff --git a/.gitignore b/.gitignore index c480bf5..b4bd7ce 100644 --- a/.gitignore +++ b/.gitignore @@ -28,6 +28,7 @@ lib/ # Dependency directory node_modules +typings/ # Optional npm cache directory .npm diff --git a/package.json b/package.json index 7d09ff5..bc469b2 100644 --- a/package.json +++ b/package.json @@ -6,11 +6,13 @@ "scripts": { "commit": "git-cz", "lint": "tslint -c tslint.json src/*.ts", - "pretape": "npm run lib", - "tape": "ts-node ./node_modules/.bin/tape tests/**/*.ts", - "test": "npm run lint && npm run tape", + "premocha": "npm run lib", + "mocha": "mocha tests/**/*.ts --require ts-node/register", + "postmocha": "rm -f tests/**/*.js", + "test": "npm run lint && npm run mocha", "prelib": "rm -rf lib/ && mkdirp lib/", "lib": "tsc", + "postinstall": "typings install", "predist": "rm -rf dist/ && mkdirp dist/ && npm run lib", "dist": "browserify lib/index.js -o dist/xstream.js", "postdist": "uglifyjs dist/xstream.js -o dist/xstream.min.js", @@ -29,15 +31,18 @@ }, "homepage": "https://github.com/staltz/xstream#readme", "devDependencies": { + "assert": "^1.3.0", "browserify": "^13.0.0", "commitizen": "^2.5.0", "conventional-changelog": "^1.1.0", "cz-conventional-changelog": "^1.1.5", "ghooks": "^1.0.3", "mkdirp": "^0.5.1", + "mocha": "^2.4.5", "ts-node": "^0.5.5", "tslint": "^3.5.0", "typescript": "^1.8.2", + "typings": "^0.6.8", "validate-commit-msg": "^2.1.0" }, "config": { diff --git a/src/index.ts b/src/index.ts index a36e84a..ba3cce6 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,81 @@ -export function Stream(subscribe: () => void) { - this.subscribe = subscribe; +export interface Observer { + next: (x: T) => void; + error: (err: any) => void; + complete: () => void; } + +export interface Machine { + start: (observer: Observer) => void; + stop: () => void; +} + +export class Stream implements Observer { + public observers: Array>; + + constructor(public machine: Machine) { + this.observers = []; + } + + next(x: T): void { + const len = this.observers.length; + for (let i = len - 1; i >= 0; i--) { + this.observers[i].next(x); + } + } + + error(err: any): void { + const len = this.observers.length; + for (let i = len - 1; i >= 0; i--) { + this.observers[i].error(err); + } + } + + complete(): void { + const len = this.observers.length; + for (let i = len - 1; i >= 0; i--) { + this.observers[i].complete(); + } + } + + subscribe(observer: Observer): void { + this.observers.push(observer); + if (this.observers.length === 1) this.machine.start(this); + } + + unsubscribe(observer: Observer): void { + const i = this.observers.indexOf(observer); + if (i > -1) { + this.observers.splice(i, 1); + if (!this.observers.length) this.machine.stop(); + } + } +} + +class IntervalMachine implements Machine { + on: boolean; + intervalID: any; + i: number; + + constructor(public period: number) { + this.intervalID = -1; + this.i = 0; + } + + start(observer: Observer): void { + this.intervalID = setInterval(() => observer.next(this.i++), this.period); + } + + stop(): void { + this.i = 0; + if (this.intervalID !== -1) clearInterval(this.intervalID); + } +} + +export function interval(period: number) { + const intervalMachine = new IntervalMachine(period); + return new Stream(intervalMachine); +} + +export default { + interval +}; diff --git a/tests/stream.js b/tests/stream.js new file mode 100644 index 0000000..8e6253d --- /dev/null +++ b/tests/stream.js @@ -0,0 +1,22 @@ +"use strict"; +var index_1 = require('../src/index'); +var assert = require('assert'); +describe('Stream', function () { + it('can be subscribed and unsubscribed with one observer', function (done) { + var stream = index_1.default.interval(100); + var i = 0; + var observer = { + next: function (x) { + assert.equal(x, i); + i += 1; + if (i === 2) { + stream.unsubscribe(observer); + done(); + } + }, + error: done.fail, + complete: done.fail, + }; + stream.subscribe(observer); + }); +}); diff --git a/tests/stream.ts b/tests/stream.ts new file mode 100644 index 0000000..474fbac --- /dev/null +++ b/tests/stream.ts @@ -0,0 +1,22 @@ +import xs from '../src/index'; +import * as assert from 'assert'; + +describe('Stream', () => { + it('can be subscribed and unsubscribed with one observer', (done) => { + const stream = xs.interval(100); + let i = 0; + let observer = { + next: (x: number) => { + assert.equal(x, i); + i += 1; + if (i === 2) { + stream.unsubscribe(observer); + done(); + } + }, + error: done.fail, + complete: done.fail, + }; + stream.subscribe(observer); + }); +}); diff --git a/tsconfig.json b/tsconfig.json index 4c69f81..3948a9c 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -7,6 +7,7 @@ "noImplicitAny": true, "suppressImplicitAnyIndexErrors": true, "module": "commonjs", + "noEmitHelpers": true, "target": "ES5", "outDir": "lib/" }, @@ -15,6 +16,7 @@ "tabSize": 2 }, "files": [ + "typings/main.d.ts", "src/index.ts" ], "exclude": [ diff --git a/tslint.json b/tslint.json index 34fac9b..46c5c3e 100644 --- a/tslint.json +++ b/tslint.json @@ -1,6 +1,6 @@ { "rules": { - "curly": true, + "curly": false, "eofline": false, "align": [true, "parameters"], "class-name": true, diff --git a/typings.json b/typings.json new file mode 100644 index 0000000..510b724 --- /dev/null +++ b/typings.json @@ -0,0 +1,9 @@ +{ + "name": "xstream", + "dependencies": {}, + "devDependencies": {}, + "ambientDevDependencies": { + "mocha": "github:DefinitelyTyped/DefinitelyTyped/mocha/mocha.d.ts#d6dd320291705694ba8e1a79497a908e9f5e6617", + "node": "github:DefinitelyTyped/DefinitelyTyped/node/node.d.ts#20e1eb9616922d382d918cc5a21870a9dbe255f5" + } +}