Skip to content

Commit

Permalink
feat(shamefullySendNext): introduce shamefullySendNext and hide _next…
Browse files Browse the repository at this point in the history
… callback
  • Loading branch information
Andre Medeiros committed Mar 27, 2016
1 parent 91eb526 commit 552caff
Show file tree
Hide file tree
Showing 43 changed files with 385 additions and 306 deletions.
2 changes: 1 addition & 1 deletion perf/runners.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ function runRx5(deferred, rxStream) {
function runXStream(deferred, xstream) {
xstream.addListener({
next: noop,
end: function() {
complete: function() {
deferred.resolve();
},
error: function(e) {
Expand Down
5 changes: 5 additions & 0 deletions src/InternalListener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export interface InternalListener<T> {
_n: (v: T) => void;
_e: (err: any) => void;
_c: () => void;
}
6 changes: 6 additions & 0 deletions src/InternalProducer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import {InternalListener} from './InternalListener';

export interface InternalProducer<T> {
_start: (listener: InternalListener<T>) => void;
_stop: () => void;
}
2 changes: 1 addition & 1 deletion src/Listener.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export interface Listener<T> {
next: (x: T) => void;
error: (err: any) => void;
end: () => void;
complete: () => void;
}
8 changes: 4 additions & 4 deletions src/Operator.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {Producer} from './Producer';
import {InternalProducer} from './InternalProducer';
import {Stream} from './Stream';

export interface Operator<T, R> extends Producer<R> {
start: (out: Stream<R>) => void;
stop: () => void;
export interface Operator<T, R> extends InternalProducer<R> {
_start: (out: Stream<R>) => void;
_stop: () => void;
ins: Stream<T>;
}
91 changes: 64 additions & 27 deletions src/Stream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import {Listener} from './Listener';
import {Producer} from './Producer';
import {InternalListener} from './InternalListener';
import {InternalProducer} from './InternalProducer';
import {MapOperator} from './operator/MapOperator';
import {FilterOperator} from './operator/FilterOperator';
import {TakeOperator} from './operator/TakeOperator';
Expand All @@ -22,68 +24,103 @@ import {MergeProducer} from './factory/MergeProducer';
import {empty} from './utils/empty';
import {noop} from './utils/noop';

export class Stream<T> implements Listener<T> {
public _listeners: Array<Listener<T>>;
export class Stream<T> implements InternalListener<T> {
public _listeners: Array<InternalListener<T>>;
public _stopID: any = empty;
public _prod: Producer<T>;
public _prod: InternalProducer<T>;

constructor(producer: Producer<T>) {
constructor(producer: InternalProducer<T>) {
this._prod = producer;
this._listeners = [];
}

next(x: T): void {
static create<T>(producer: Producer<T>): Stream<T> {
(<InternalProducer<T>> (<any> producer))._start =
function _start(internalListener: InternalListener<T>) {
(<Listener<T>> (<any> internalListener)).next = internalListener._n;
(<Listener<T>> (<any> internalListener)).error = internalListener._e;
(<Listener<T>> (<any> internalListener)).complete = internalListener._c;
producer.start(<Listener<T>> (<any> internalListener));
};
(<InternalProducer<T>> (<any> producer))._stop = producer.stop;
return new Stream(<InternalProducer<T>> (<any> producer));
}

shamefullySendNext(value: T) {
this._n(value);
}

shamefullySendError(error: any) {
this._e(error);
}

shamefullySendComplete() {
this._c();
}

_n(t: T): void {
const len = this._listeners.length;
if (len === 1) {
this._listeners[0].next(x);
this._listeners[0]._n(t);
} else {
for (let i = 0; i < len; i++) {
this._listeners[i].next(x);
this._listeners[i]._n(t);
}
}
}

error(err: any): void {
_e(err: any): void {
const len = this._listeners.length;
if (len === 1) {
this._listeners[0].error(err);
this._listeners[0]._e(err);
} else {
for (let i = 0; i < len; i++) {
this._listeners[i].error(err);
this._listeners[i]._e(err);
}
}
}

end(): void {
_c(): void {
const len = this._listeners.length;
if (len === 1) {
this._listeners[0].end();
this._listeners[0]._c();
} else {
for (let i = 0; i < len; i++) {
this._listeners[i].end();
this._listeners[i]._c();
}
}
this._stopID = setTimeout(() => this._prod.stop());
this._stopID = setTimeout(() => this._prod._stop());
this._listeners = [];
}

addListener(listener: Listener<T>): void {
(<InternalListener<T>> (<any> listener))._n = listener.next;
(<InternalListener<T>> (<any> listener))._e = listener.error;
(<InternalListener<T>> (<any> listener))._c = listener.complete;
this._add(<InternalListener<T>> (<any> listener));
}

removeListener(listener: Listener<T>): void {
this._remove(<InternalListener<T>> (<any> listener));
}

_add(listener: InternalListener<T>): void {
this._listeners.push(listener);
if (this._listeners.length === 1) {
if (this._stopID !== empty) {
clearTimeout(this._stopID);
this._stopID = empty;
}
this._prod.start(this);
this._prod._start(this);
}
}

removeListener(listener: Listener<T>): void {
_remove(listener: InternalListener<T>): void {
const i = this._listeners.indexOf(listener);
if (i > -1) {
this._listeners.splice(i, 1);
if (this._listeners.length <= 0) {
this._stopID = setTimeout(() => this._prod.stop());
this._stopID = setTimeout(() => this._prod._stop());
}
}
}
Expand All @@ -95,7 +132,7 @@ export class Stream<T> implements Listener<T> {
};

static MemoryStream<T>(): MemoryStream<T> {
return new MemoryStream<T>({start: noop, stop: noop});
return new MemoryStream<T>({_start: noop, _stop: noop});
}

static from<T>(array: Array<T>): Stream<T> {
Expand All @@ -121,13 +158,13 @@ export class Stream<T> implements Listener<T> {
}

static never(): Stream<void> {
return new Stream<void>({start: noop, stop: noop});
return new Stream<void>({_start: noop, _stop: noop});
}

static empty(): Stream<void> {
return new Stream<void>({
start(obs: Listener<void>) { obs.end(); },
stop: noop,
_start(lner: InternalListener<void>) { lner._c(); },
_stop: noop,
});
}

Expand Down Expand Up @@ -189,17 +226,17 @@ export class Stream<T> implements Listener<T> {

export class MemoryStream<T> extends Stream<T> {
public _val: any;
constructor(producer: Producer<T>) {
constructor(producer: InternalProducer<T>) {
super(producer);
}

next(x: T) {
_n(x: T) {
this._val = x;
super.next(x);
super._n(x);
}

addListener(listener: Listener<T>): void {
super.addListener(listener);
if (this._val) { listener.next(this._val); }
_add(listener: InternalListener<T>): void {
super._add(listener);
if (this._val) { listener._n(this._val); }
}
}
30 changes: 15 additions & 15 deletions src/factory/CombineProducer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {Listener} from '../Listener';
import {Producer} from '../Producer';
import {InternalListener} from '../InternalListener';
import {InternalProducer} from '../InternalProducer';
import {Stream} from '../Stream';
import {emptyListener} from '../utils/emptyListener';
import {invoke} from '../utils/invoke';
Expand Down Expand Up @@ -57,37 +57,37 @@ export interface CombineInstanceSignature<T> {
<R>(project: (...args: Array<any>) => R, ...streams: Array<Stream<any>>): Stream<R>;
}

export class Proxy<T> implements Listener<T> {
export class Proxy<T> implements InternalListener<T> {
constructor(public i: number, public prod: CombineProducer<T>) {
prod.proxies.push(this);
}

next(t: T): void {
_n(t: T): void {
const prod = this.prod;
prod.hasVal[this.i] = true;
prod.vals[this.i] = t;
if (!prod.ready) {
prod.up();
}
if (prod.ready) {
prod.out.next(invoke(prod.project, prod.vals));
prod.out._n(invoke(prod.project, prod.vals));
}
}

error(err: any): void {
this.prod.out.error(err);
_e(err: any): void {
this.prod.out._e(err);
}

end(): void {
_c(): void {
const prod = this.prod;
if (--prod.ac === 0) {
prod.out.end();
prod.out._c();
}
}
}

export class CombineProducer<R> implements Producer<R> {
public out: Listener<R> = emptyListener;
export class CombineProducer<R> implements InternalProducer<R> {
public out: InternalListener<R> = emptyListener;
public proxies: Array<Proxy<any>> = [];
public ready: boolean = false;
public hasVal: Array<boolean>;
Expand All @@ -111,16 +111,16 @@ export class CombineProducer<R> implements Producer<R> {
this.ready = true;
}

start(out: Listener<R>): void {
_start(out: InternalListener<R>): void {
this.out = out;
for (let i = this.streams.length - 1; i >= 0; i--) {
this.streams[i].addListener(new Proxy(i, this));
this.streams[i]._add(new Proxy(i, this));
}
}

stop(): void {
_stop(): void {
for (let i = this.streams.length - 1; i >= 0; i--) {
this.streams[i].removeListener(this.proxies[i]);
this.streams[i]._remove(this.proxies[i]);
}
this.proxies = [];
}
Expand Down
12 changes: 6 additions & 6 deletions src/factory/EventProducer.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import {Producer} from '../Producer';
import {Listener} from '../Listener';
import {InternalProducer} from '../InternalProducer';
import {InternalListener} from '../InternalListener';

export class EventProducer implements Producer<Event> {
export class EventProducer implements InternalProducer<Event> {
private listener: EventListener;

constructor(public node: EventTarget,
public eventType: string,
public useCapture: boolean) {
}

start(out: Listener<Event>) {
this.listener = (e) => out.next(e);
_start(out: InternalListener<Event>) {
this.listener = (e) => out._n(e);
const {node, eventType, useCapture} = this;
node.addEventListener(eventType, this.listener, useCapture);
}

stop() {
_stop() {
const {node, eventType, listener, useCapture} = this;
node.removeEventListener(eventType, listener, useCapture);
}
Expand Down
14 changes: 7 additions & 7 deletions src/factory/FromProducer.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import {Producer} from '../Producer';
import {Listener} from '../Listener';
import {InternalProducer} from '../InternalProducer';
import {InternalListener} from '../InternalListener';
import {noop} from '../utils/noop';

export class FromProducer<T> implements Producer<T> {
export class FromProducer<T> implements InternalProducer<T> {
constructor(public a: Array<T>) {
}

start(out: Listener<T>): void {
_start(out: InternalListener<T>): void {
const a = this.a;
for (let i = 0, l = a.length; i < l; i++) {
out.next(a[i]);
out._n(a[i]);
}
out.end();
out._c();
}

stop(): void {
_stop(): void {
noop();
}
}
12 changes: 6 additions & 6 deletions src/factory/IntervalProducer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {Producer} from '../Producer';
import {Listener} from '../Listener';
import {InternalProducer} from '../InternalProducer';
import {InternalListener} from '../InternalListener';

export class IntervalProducer implements Producer<number> {
export class IntervalProducer implements InternalProducer<number> {
on: boolean;
intervalID: any;
i: number;
Expand All @@ -11,13 +11,13 @@ export class IntervalProducer implements Producer<number> {
this.i = 0;
}

start(stream: Listener<number>): void {
_start(stream: InternalListener<number>): void {
const self = this;
function intervalHandler() { stream.next(self.i++); }
function intervalHandler() { stream._n(self.i++); }
this.intervalID = setInterval(intervalHandler, this.period);
}

stop(): void {
_stop(): void {
this.i = 0;
if (this.intervalID !== -1) clearInterval(this.intervalID);
}
Expand Down
Loading

0 comments on commit 552caff

Please sign in to comment.