Skip to content

Commit

Permalink
refactor(src): rename Machine to Producer
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros committed Feb 26, 2016
1 parent 8a25fe7 commit 13bd699
Show file tree
Hide file tree
Showing 11 changed files with 53 additions and 53 deletions.
2 changes: 1 addition & 1 deletion src/Machine.ts → src/Producer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {Observer} from './Observer';

export interface Machine<T> {
export interface Producer<T> {
start: (observer: Observer<T>) => void;
stop: () => void;
}
32 changes: 16 additions & 16 deletions src/Stream.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import {Observer} from './Observer';
import {Machine} from './Machine';
import {MapMachine} from './operator/MapMachine';
import {FilterMachine} from './operator/FilterMachine';
import {TakeMachine} from './operator/TakeMachine';
import {SkipMachine} from './operator/SkipMachine';
import {DebugMachine} from './operator/DebugMachine';
import {FoldMachine} from './operator/FoldMachine';
import {LastMachine} from './operator/LastMachine';
import {Producer} from './Producer';
import {MapProducer} from './operator/MapProducer';
import {FilterProducer} from './operator/FilterProducer';
import {TakeProducer} from './operator/TakeProducer';
import {SkipProducer} from './operator/SkipProducer';
import {DebugProducer} from './operator/DebugProducer';
import {FoldProducer} from './operator/FoldProducer';
import {LastProducer} from './operator/LastProducer';

export class Stream<T> implements Observer<T> {
public observers: Array<Observer<T>>;
public num: number; // Number of non-operator subscribers

constructor(public machine: Machine<T>) {
constructor(public machine: Producer<T>) {
this.observers = [];
this.num = 0;
}
Expand Down Expand Up @@ -64,30 +64,30 @@ export class Stream<T> implements Observer<T> {
}

map<U>(project: (t: T) => U): Stream<U> {
return new Stream<U>(new MapMachine(project, this));
return new Stream<U>(new MapProducer(project, this));
}

filter(predicate: (t: T) => boolean): Stream<T> {
return new Stream<T>(new FilterMachine(predicate, this));
return new Stream<T>(new FilterProducer(predicate, this));
}

take(amount: number): Stream<T> {
return new Stream<T>(new TakeMachine(amount, this));
return new Stream<T>(new TakeProducer(amount, this));
}

skip(amount: number): Stream<T> {
return new Stream<T>(new SkipMachine(amount, this));
return new Stream<T>(new SkipProducer(amount, this));
}

debug(spy: (t: T) => void = null): Stream<T> {
return new Stream<T>(new DebugMachine(spy, this));
return new Stream<T>(new DebugProducer(spy, this));
}

fold<R>(accumulate: (acc: R, t: T) => R, init: R): Stream<R> {
return new Stream<R>(new FoldMachine(accumulate, init, this));
return new Stream<R>(new FoldProducer(accumulate, init, this));
}

last(): Stream<T> {
return new Stream<T>(new LastMachine(this));
return new Stream<T>(new LastProducer(this));
}
}
8 changes: 4 additions & 4 deletions src/factory/from.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {Machine} from '../Machine';
import {Producer} from '../Producer';
import {Observer} from '../Observer';
import {Stream} from '../Stream';
import {noop} from '../utils/noop';

class FromMachine<T> implements Machine<T> {
class FromProducer<T> implements Producer<T> {
constructor(public array: Array<T>) {
}

Expand All @@ -22,6 +22,6 @@ class FromMachine<T> implements Machine<T> {
}

export default function from<T>(array: Array<T>) {
const fromMachine = new FromMachine(array);
return new Stream<T>(fromMachine);
const fromProducer = new FromProducer(array);
return new Stream<T>(fromProducer);
}
8 changes: 4 additions & 4 deletions src/factory/interval.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {Machine} from '../Machine';
import {Producer} from '../Producer';
import {Observer} from '../Observer';
import {Stream} from '../Stream';

class IntervalMachine implements Machine<number> {
class IntervalProducer implements Producer<number> {
on: boolean;
intervalID: any;
i: number;
Expand All @@ -25,6 +25,6 @@ class IntervalMachine implements Machine<number> {
}

export default function interval(period: number) {
const intervalMachine = new IntervalMachine(period);
return new Stream<number>(intervalMachine);
const intervalProducer = new IntervalProducer(period);
return new Stream<number>(intervalProducer);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {Observer} from '../Observer';
import {Machine} from '../Machine';
import {Producer} from '../Producer';
import {Stream} from '../Stream';
import {emptyObserver} from '../utils/emptyObserver';

export class DebugMachine<T> implements Machine<T> {
export class DebugProducer<T> implements Producer<T> {
public proxy: Observer<T> = emptyObserver;

constructor(public spy: (t: T) => void = null,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import {Observer} from '../Observer';
import {Machine} from '../Machine';
import {Producer} from '../Producer';
import {Stream} from '../Stream';
import {emptyObserver} from '../utils/emptyObserver';

export class Proxy<T> implements Observer<T> {
constructor(public out: Stream<T>,
public machine: FilterMachine<T>) {
public p: FilterProducer<T>) {
}

next(t: T) {
if (this.machine.predicate(t)) this.out.next(t);
if (this.p.predicate(t)) this.out.next(t);
}

error(err: any) {
Expand All @@ -21,7 +21,7 @@ export class Proxy<T> implements Observer<T> {
}
}

export class FilterMachine<T> implements Machine<T> {
export class FilterProducer<T> implements Producer<T> {
public proxy: Observer<T> = emptyObserver;

constructor(public predicate: (t: T) => boolean,
Expand Down
10 changes: 5 additions & 5 deletions src/operator/FoldMachine.ts → src/operator/FoldProducer.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import {Observer} from '../Observer';
import {Machine} from '../Machine';
import {Producer} from '../Producer';
import {Stream} from '../Stream';
import {emptyObserver} from '../utils/emptyObserver';

export class Proxy<T, R> implements Observer<T> {
constructor(public out: Stream<R>,
public m: FoldMachine<T, R>) {
public p: FoldProducer<T, R>) {
}

next(t: T) {
const m = this.m;
this.out.next(m.acc = m.a(m.acc, t));
const p = this.p;
this.out.next(p.acc = p.a(p.acc, t));
}

error(err: any) {
Expand All @@ -22,7 +22,7 @@ export class Proxy<T, R> implements Observer<T> {
}
}

export class FoldMachine<T, R> implements Machine<R> {
export class FoldProducer<T, R> implements Producer<R> {
public proxy: Observer<T> = emptyObserver;
public acc: R;

Expand Down
18 changes: 9 additions & 9 deletions src/operator/LastMachine.ts → src/operator/LastProducer.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,36 @@
import {Observer} from '../Observer';
import {Machine} from '../Machine';
import {Producer} from '../Producer';
import {Stream} from '../Stream';
import {emptyObserver} from '../utils/emptyObserver';

export class Proxy<T> implements Observer<T> {
constructor(public out: Stream<T>,
public machine: LastMachine<T>) {
public p: LastProducer<T>) {
}

next(t: T) {
const m = this.machine;
m.has = true;
m.val = t;
const p = this.p;
p.has = true;
p.val = t;
}

error(err: any) {
this.out.error(err);
}

complete() {
const m = this.machine;
const p = this.p;
const out = this.out;
if (m.has) {
out.next(m.val);
if (p.has) {
out.next(p.val);
out.complete();
} else {
out.error('TODO show proper error');
}
}
}

export class LastMachine<T> implements Machine<T> {
export class LastProducer<T> implements Producer<T> {
public proxy: Observer<T> = emptyObserver;
public has: boolean = false;
public val: T = <T> {};
Expand Down
8 changes: 4 additions & 4 deletions src/operator/MapMachine.ts → src/operator/MapProducer.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import {Observer} from '../Observer';
import {Machine} from '../Machine';
import {Producer} from '../Producer';
import {Stream} from '../Stream';
import {emptyObserver} from '../utils/emptyObserver';

export class Proxy<T, U> implements Observer<T> {
constructor(public out: Stream<U>,
public machine: MapMachine<T, U>) {
public p: MapProducer<T, U>) {
}

next(t: T) {
this.out.next(this.machine.project(t));
this.out.next(this.p.project(t));
}

error(err: any) {
Expand All @@ -21,7 +21,7 @@ export class Proxy<T, U> implements Observer<T> {
}
}

export class MapMachine<T, U> implements Machine<U> {
export class MapProducer<T, U> implements Producer<U> {
public proxy: Observer<T> = emptyObserver;

constructor(public project: (t: T) => U,
Expand Down
4 changes: 2 additions & 2 deletions src/operator/SkipMachine.ts → src/operator/SkipProducer.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {Observer} from '../Observer';
import {Machine} from '../Machine';
import {Producer} from '../Producer';
import {Stream} from '../Stream';
import {emptyObserver} from '../utils/emptyObserver';

export class SkipMachine<T> implements Machine<T> {
export class SkipProducer<T> implements Producer<T> {
public proxy: Observer<T> = emptyObserver;
public skipped: number = 0;

Expand Down
4 changes: 2 additions & 2 deletions src/operator/TakeMachine.ts → src/operator/TakeProducer.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {Observer} from '../Observer';
import {Machine} from '../Machine';
import {Producer} from '../Producer';
import {Stream} from '../Stream';
import {emptyObserver} from '../utils/emptyObserver';

export class TakeMachine<T> implements Machine<T> {
export class TakeProducer<T> implements Producer<T> {
public proxy: Observer<T> = emptyObserver;
public taken: number = 0;

Expand Down

0 comments on commit 13bd699

Please sign in to comment.