Skip to content

Commit

Permalink
feat(Stream): implement really simply Stream and interval() factory
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros committed Feb 24, 2016
1 parent 6b9c54a commit a3a08e7
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 6 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ lib/

# Dependency directory
node_modules
typings/

# Optional npm cache directory
.npm
Expand Down
11 changes: 8 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
"scripts": {
"commit": "git-cz",
"lint": "tslint -c tslint.json src/*.ts",
"pretape": "npm run lib",
"tape": "ts-node ./node_modules/.bin/tape tests/**/*.ts",
"test": "npm run lint && npm run tape",
"premocha": "npm run lib",
"mocha": "mocha tests/**/*.ts --require ts-node/register",
"postmocha": "rm -f tests/**/*.js",
"test": "npm run lint && npm run mocha",
"prelib": "rm -rf lib/ && mkdirp lib/",
"lib": "tsc",
"postinstall": "typings install",
"predist": "rm -rf dist/ && mkdirp dist/ && npm run lib",
"dist": "browserify lib/index.js -o dist/xstream.js",
"postdist": "uglifyjs dist/xstream.js -o dist/xstream.min.js",
Expand All @@ -29,15 +31,18 @@
},
"homepage": "https://github.com/staltz/xstream#readme",
"devDependencies": {
"assert": "^1.3.0",
"browserify": "^13.0.0",
"commitizen": "^2.5.0",
"conventional-changelog": "^1.1.0",
"cz-conventional-changelog": "^1.1.5",
"ghooks": "^1.0.3",
"mkdirp": "^0.5.1",
"mocha": "^2.4.5",
"ts-node": "^0.5.5",
"tslint": "^3.5.0",
"typescript": "^1.8.2",
"typings": "^0.6.8",
"validate-commit-msg": "^2.1.0"
},
"config": {
Expand Down
82 changes: 80 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,81 @@
export function Stream(subscribe: () => void) {
this.subscribe = subscribe;
export interface Observer<T> {
next: (x: T) => void;
error: (err: any) => void;
complete: () => void;
}

export interface Machine<T> {
start: (observer: Observer<T>) => void;
stop: () => void;
}

export class Stream<T> implements Observer<T> {
public observers: Array<Observer<T>>;

constructor(public machine: Machine<T>) {
this.observers = [];
}

next(x: T): void {
const len = this.observers.length;
for (let i = len - 1; i >= 0; i--) {
this.observers[i].next(x);
}
}

error(err: any): void {
const len = this.observers.length;
for (let i = len - 1; i >= 0; i--) {
this.observers[i].error(err);
}
}

complete(): void {
const len = this.observers.length;
for (let i = len - 1; i >= 0; i--) {
this.observers[i].complete();
}
}

subscribe(observer: Observer<T>): void {
this.observers.push(observer);
if (this.observers.length === 1) this.machine.start(this);
}

unsubscribe(observer: Observer<T>): void {
const i = this.observers.indexOf(observer);
if (i > -1) {
this.observers.splice(i, 1);
if (!this.observers.length) this.machine.stop();
}
}
}

class IntervalMachine implements Machine<number> {
on: boolean;
intervalID: any;
i: number;

constructor(public period: number) {
this.intervalID = -1;
this.i = 0;
}

start(observer: Observer<number>): void {
this.intervalID = setInterval(() => observer.next(this.i++), this.period);
}

stop(): void {
this.i = 0;
if (this.intervalID !== -1) clearInterval(this.intervalID);
}
}

export function interval(period: number) {
const intervalMachine = new IntervalMachine(period);
return new Stream<number>(intervalMachine);
}

export default {
interval
};
22 changes: 22 additions & 0 deletions tests/stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"use strict";
var index_1 = require('../src/index');
var assert = require('assert');
describe('Stream', function () {
it('can be subscribed and unsubscribed with one observer', function (done) {
var stream = index_1.default.interval(100);
var i = 0;
var observer = {
next: function (x) {
assert.equal(x, i);
i += 1;
if (i === 2) {
stream.unsubscribe(observer);
done();
}
},
error: done.fail,
complete: done.fail,
};
stream.subscribe(observer);
});
});
22 changes: 22 additions & 0 deletions tests/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import xs from '../src/index';
import * as assert from 'assert';

describe('Stream', () => {
it('can be subscribed and unsubscribed with one observer', (done) => {
const stream = xs.interval(100);
let i = 0;
let observer = {
next: (x: number) => {
assert.equal(x, i);
i += 1;
if (i === 2) {
stream.unsubscribe(observer);
done();
}
},
error: done.fail,
complete: done.fail,
};
stream.subscribe(observer);
});
});
2 changes: 2 additions & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"noImplicitAny": true,
"suppressImplicitAnyIndexErrors": true,
"module": "commonjs",
"noEmitHelpers": true,
"target": "ES5",
"outDir": "lib/"
},
Expand All @@ -15,6 +16,7 @@
"tabSize": 2
},
"files": [
"typings/main.d.ts",
"src/index.ts"
],
"exclude": [
Expand Down
2 changes: 1 addition & 1 deletion tslint.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"rules": {
"curly": true,
"curly": false,
"eofline": false,
"align": [true, "parameters"],
"class-name": true,
Expand Down
9 changes: 9 additions & 0 deletions typings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"name": "xstream",
"dependencies": {},
"devDependencies": {},
"ambientDevDependencies": {
"mocha": "github:DefinitelyTyped/DefinitelyTyped/mocha/mocha.d.ts#d6dd320291705694ba8e1a79497a908e9f5e6617",
"node": "github:DefinitelyTyped/DefinitelyTyped/node/node.d.ts#20e1eb9616922d382d918cc5a21870a9dbe255f5"
}
}

0 comments on commit a3a08e7

Please sign in to comment.