Skip to content

Commit

Permalink
refactor(Stream): rearrange some methods (addListener, removeListener)
Browse files Browse the repository at this point in the history
  • Loading branch information
staltz committed Jul 8, 2016
1 parent 6bdf596 commit e55da7a
Showing 1 changed file with 27 additions and 27 deletions.
54 changes: 27 additions & 27 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -985,33 +985,6 @@ export class Stream<T> implements InternalListener<T> {
this._err = null;
}

/**
* Adds a Listener to the Stream.
*
* @param {Listener<T>} listener
*/
addListener(listener: Listener<T>): void {
if (typeof listener.next !== 'function'
|| typeof listener.error !== 'function'
|| typeof listener.complete !== 'function') {
throw new Error('stream.addListener() requires all three next, error, ' +
'and complete functions.');
}
(<InternalListener<T>> (<any> listener))._n = listener.next;
(<InternalListener<T>> (<any> listener))._e = listener.error;
(<InternalListener<T>> (<any> listener))._c = listener.complete;
this._add(<InternalListener<T>> (<any> listener));
}

/**
* Removes a Listener from the Stream, assuming the Listener was added to it.
*
* @param {Listener<T>} listener
*/
removeListener(listener: Listener<T>): void {
this._remove(<InternalListener<T>> (<any> listener));
}

_add(il: InternalListener<T>): void {
const ta = this._target;
if (ta) return ta._add(il);
Expand Down Expand Up @@ -1080,6 +1053,33 @@ export class Stream<T> implements InternalListener<T> {
return this instanceof MemoryStream ? MemoryStream : Stream;
}

/**
* Adds a Listener to the Stream.
*
* @param {Listener<T>} listener
*/
addListener(listener: Listener<T>): void {
if (typeof listener.next !== 'function'
|| typeof listener.error !== 'function'
|| typeof listener.complete !== 'function') {
throw new Error('stream.addListener() requires all three next, error, ' +
'and complete functions.');
}
(<InternalListener<T>> (<any> listener))._n = listener.next;
(<InternalListener<T>> (<any> listener))._e = listener.error;
(<InternalListener<T>> (<any> listener))._c = listener.complete;
this._add(<InternalListener<T>> (<any> listener));
}

/**
* Removes a Listener from the Stream, assuming the Listener was added to it.
*
* @param {Listener<T>} listener
*/
removeListener(listener: Listener<T>): void {
this._remove(<InternalListener<T>> (<any> listener));
}

/**
* Creates a new Stream given a Producer.
*
Expand Down

0 comments on commit e55da7a

Please sign in to comment.