Skip to content

Commit

Permalink
fix(extra): move flattenConcurrently from core to extra
Browse files Browse the repository at this point in the history
Make flattenConcurrently() an extra operator because it is not that often used as flatten() is.
Usage is: `import flattenConcurrently from
'xstream/extra/flattenConcurrently'` and then
`streamOfStreams.compose(flattenConcurrently)`.

BREAKING CHANGE:
flattenConcurrently must be separately imported as an extra operator and
used with .compose()
  • Loading branch information
staltz committed Jun 2, 2016
1 parent 61838bb commit 7d0fc01
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 203 deletions.
177 changes: 6 additions & 171 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -549,70 +549,7 @@ export class FilterOperator<T> implements Operator<T, T> {
}
}

class FCIL<T> implements InternalListener<T> {
constructor(private out: Stream<T>,
private op: FlattenConcOperator<T>) {
}

_n(t: T) {
this.out._n(t);
}

_e(err: any) {
this.out._e(err);
}

_c() {
this.op.less();
}
}

export class FlattenConcOperator<T> implements Operator<Stream<T>, T> {
public type = 'flattenConcurrently';
private active: number = 1; // number of outers and inners that have not yet ended
private out: Stream<T> = null;

constructor(public ins: Stream<Stream<T>>) {
}

_start(out: Stream<T>): void {
this.out = out;
this.ins._add(this);
}

_stop(): void {
this.ins._remove(this);
this.active = 1;
this.out = null;
}

less(): void {
if (--this.active === 0) {
const u = this.out;
if (!u) return;
u._c();
}
}

_n(s: Stream<T>) {
const u = this.out;
if (!u) return;
this.active++;
s._add(new FCIL(u, this));
}

_e(err: any) {
const u = this.out;
if (!u) return;
u._e(err);
}

_c() {
this.less();
}
}

class FIL<T> implements InternalListener<T> {
class FlattenListener<T> implements InternalListener<T> {
constructor(private out: Stream<T>,
private op: FlattenOperator<T>) {
}
Expand Down Expand Up @@ -665,7 +602,7 @@ export class FlattenOperator<T> implements Operator<Stream<T>, T> {
if (!u) return;
const {inner, il} = this;
if (inner && il) inner._remove(il);
(this.inner = s)._add(this.il = new FIL(u, this));
(this.inner = s)._add(this.il = new FlattenListener(u, this));
}

_e(err: any) {
Expand Down Expand Up @@ -770,77 +707,7 @@ export class LastOperator<T> implements Operator<T, T> {
}
}

class MFCIL<R> implements InternalListener<R> {
constructor(private out: Stream<R>,
private op: MapFlattenConcOperator<any, R>) {
}

_n(r: R) {
this.out._n(r);
}

_e(err: any) {
this.out._e(err);
}

_c() {
this.op.less();
}
}

export class MapFlattenConcOperator<T, R> implements Operator<T, R> {
public type: string;
public ins: Stream<T>;
private active: number = 1; // number of outers and inners that have not yet ended
private out: Stream<R> = null;

constructor(public mapOp: MapOperator<T, Stream<R>>) {
this.type = `${mapOp.type}+flattenConcurrently`;
this.ins = mapOp.ins;
}

_start(out: Stream<R>): void {
this.out = out;
this.mapOp.ins._add(this);
}

_stop(): void {
this.mapOp.ins._remove(this);
this.active = 1;
this.out = null;
}

less(): void {
if (--this.active === 0) {
const u = this.out;
if (!u) return;
u._c();
}
}

_n(v: T) {
const u = this.out;
if (!u) return;
this.active++;
try {
this.mapOp.project(v)._add(new MFCIL(u, this));
} catch (e) {
u._e(e);
}
}

_e(err: any) {
const u = this.out;
if (!u) return;
u._e(err);
}

_c() {
this.less();
}
}

class MFIL<R> implements InternalListener<R> {
class MapFlattenInner<R> implements InternalListener<R> {
constructor(private out: Stream<R>,
private op: MapFlattenOperator<any, R>) {
}
Expand Down Expand Up @@ -899,7 +766,9 @@ export class MapFlattenOperator<T, R> implements Operator<T, R> {
const {inner, il} = this;
if (inner && il) inner._remove(il);
try {
(this.inner = this.mapOp.project(v))._add(this.il = new MFIL(u, this));
(this.inner = this.mapOp.project(v))._add(
this.il = new MapFlattenInner(u, this)
);
} catch (e) {
u._e(e);
}
Expand Down Expand Up @@ -1716,40 +1585,6 @@ export class Stream<T> implements InternalListener<T> {
);
}

/**
* Flattens a "stream of streams", handling multiple concurrent nested streams
* simultaneously.
*
* If the input stream is a stream that emits streams, then this operator will
* return an output stream which is a flat stream: emits regular events. The
* flattening happens concurrently. It works like this: when the input stream
* emits a nested stream, *flattenConcurrently* will start imitating that
* nested one. When the next nested stream is emitted on the input stream,
* *flattenConcurrently* will also imitate that new one, but will continue to
* imitate the previous nested streams as well.
*
* Marble diagram:
*
* ```text
* --+--------+---------------
* \ \
* \ ----1----2---3--
* --a--b----c----d--------
* flattenConcurrently
* -----a--b----c-1--d-2---3--
* ```
*
* @return {Stream}
*/
flattenConcurrently<R>(): T {
const p = this._prod;
return <T> <any> new Stream<R>(
p instanceof MapOperator && !(p instanceof FilterMapOperator) ?
new MapFlattenConcOperator(<MapOperator<any, Stream<R>>> <any> p) :
new FlattenConcOperator(<Stream<Stream<R>>> <any> this)
);
}

/**
* Blends two streams together, emitting events from both.
*
Expand Down
93 changes: 93 additions & 0 deletions src/extra/flattenConcurrently.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import {Operator, Stream, InternalListener} from '../core';

class FCIL<T> implements InternalListener<T> {
constructor(private out: Stream<T>,
private op: FlattenConcOperator<T>) {
}

_n(t: T) {
this.out._n(t);
}

_e(err: any) {
this.out._e(err);
}

_c() {
this.op.less();
}
}

export class FlattenConcOperator<T> implements Operator<Stream<T>, T> {
public type = 'flattenConcurrently';
private active: number = 1; // number of outers and inners that have not yet ended
private out: Stream<T> = null;

constructor(public ins: Stream<Stream<T>>) {
}

_start(out: Stream<T>): void {
this.out = out;
this.ins._add(this);
}

_stop(): void {
this.ins._remove(this);
this.active = 1;
this.out = null;
}

less(): void {
if (--this.active === 0) {
const u = this.out;
if (!u) return;
u._c();
}
}

_n(s: Stream<T>) {
const u = this.out;
if (!u) return;
this.active++;
s._add(new FCIL(u, this));
}

_e(err: any) {
const u = this.out;
if (!u) return;
u._e(err);
}

_c() {
this.less();
}
}

/**
* Flattens a "stream of streams", handling multiple concurrent nested streams
* simultaneously.
*
* If the input stream is a stream that emits streams, then this operator will
* return an output stream which is a flat stream: emits regular events. The
* flattening happens concurrently. It works like this: when the input stream
* emits a nested stream, *flattenConcurrently* will start imitating that
* nested one. When the next nested stream is emitted on the input stream,
* *flattenConcurrently* will also imitate that new one, but will continue to
* imitate the previous nested streams as well.
*
* Marble diagram:
*
* ```text
* --+--------+---------------
* \ \
* \ ----1----2---3--
* --a--b----c----d--------
* flattenConcurrently
* -----a--b----c-1--d-2---3--
* ```
*
* @return {Stream}
*/
export default function flattenConcurrently<T>(ins: Stream<Stream<T>>): Stream<T> {
return new Stream<T>(new FlattenConcOperator(ins));
}
Loading

0 comments on commit 7d0fc01

Please sign in to comment.