diff --git a/spec/operators/merge-all-spec.js b/spec/operators/merge-all-spec.js index 2a1813377e..0861e92747 100644 --- a/spec/operators/merge-all-spec.js +++ b/spec/operators/merge-all-spec.js @@ -1,4 +1,4 @@ -/* globals describe, it, expect */ +/* globals describe, it, expect, expectObservable, hot, cold */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; diff --git a/spec/operators/merge-spec.js b/spec/operators/merge-spec.js index dfba3d7893..89016ceebb 100644 --- a/spec/operators/merge-spec.js +++ b/spec/operators/merge-spec.js @@ -1,4 +1,4 @@ -/* globals describe, it, expect */ +/* globals describe, it, expect, hot, cold, expectObservable, rxTestScheduler */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; var immediateScheduler = Rx.Scheduler.immediate; @@ -23,6 +23,14 @@ describe("Observable.prototype.merge", function () { expect(val).toBe(r[i++]); }, null, done); }); + + + it('should handle merging two hot observables', function (){ + var e1 = hot('--a-----b-----c----|'); + var e2 = hot('-----d-----e-----f---|'); + var expected = '--a--d--b--e--c--f---|'; + expectObservable(e1.merge(e2, rxTestScheduler)).toBe(expected); + }); }); describe('Observable.prototype.mergeAll', function () { diff --git a/src/operators/merge-static.ts b/src/operators/merge-static.ts index 40c12da440..d4f6ff9202 100644 --- a/src/operators/merge-static.ts +++ b/src/operators/merge-static.ts @@ -1,7 +1,7 @@ import Scheduler from '../Scheduler'; import Observable from '../Observable'; import ArrayObservable from '../observables/ArrayObservable'; -import { MergeOperator } from './merge-support'; +import { MergeAllOperator } from './mergeAll-support'; import immediate from '../schedulers/immediate'; export default function merge(...observables: (Observable|Scheduler|number)[]): Observable { @@ -20,6 +20,6 @@ export default function merge(...observables: (Observable|Scheduler|numb if(observables.length === 1) { return >observables[0]; } - - return new ArrayObservable(observables, scheduler).lift(new MergeOperator(concurrent)); + + return new ArrayObservable(observables, scheduler).lift(new MergeAllOperator(concurrent)); } \ No newline at end of file diff --git a/src/operators/merge-support.ts b/src/operators/merge-support.ts deleted file mode 100644 index 03ee4e8756..0000000000 --- a/src/operators/merge-support.ts +++ /dev/null @@ -1,109 +0,0 @@ -import Operator from '../Operator'; -import Subscriber from '../Subscriber'; -import Observer from '../Observer'; -import Observable from '../Observable'; -import ScalarObservable from '../observables/ScalarObservable'; -import Subscription from '../Subscription'; - -export class MergeOperator implements Operator { - - concurrent: number; - - constructor(concurrent: number = Number.POSITIVE_INFINITY) { - this.concurrent = concurrent; - } - - call(subscriber: Subscriber): Subscriber { - return new MergeSubscriber(subscriber, this.concurrent); - } -} - -export class MergeSubscriber extends Subscriber { - - count: number = 0; - active: number = 0; - stopped: boolean = false; - buffer: Observable[] = []; - concurrent: number; - - constructor(destination: Observer, concurrent: number) { - super(destination); - this.concurrent = concurrent; - } - - _next(value) { - const active = this.active; - if (active < this.concurrent) { - - const index = this.count; - const observable = this._project(value, index); - - if (observable) { - this.count = index + 1; - this.active = active + 1; - this.add(this._subscribeInner(observable, value, index)); - } - } else { - this._buffer(value); - } - } - - complete() { - this.stopped = true; - if (this.active === 0 && this.buffer.length === 0) { - super.complete(); - } - } - - _unsubscribe() { - this.buffer = void 0; - } - - _project(value, index) { - return value; - } - - _buffer(value) { - this.buffer.push(value); - } - - _subscribeInner(observable:Observable, value, index): Subscription { - const destination = this.destination; - if(observable._isScalar) { - destination.next((observable).value); - this._innerComplete(); - } else { - const subscriber = new MergeInnerSubscriber(destination, this); - observable._subscribe(subscriber); - return subscriber; - } - } - - _innerComplete() { - - const buffer = this.buffer; - const active = this.active -= 1; - const stopped = this.stopped; - const pending = buffer.length; - - if (stopped && active === 0 && pending === 0) { - super.complete(); - } else if (active < this.concurrent && pending > 0) { - this._next(buffer.shift()); - } - } -} - -export class MergeInnerSubscriber extends Subscriber { - - parent: MergeSubscriber; - - constructor(destination: Observer, parent: MergeSubscriber) { - super(destination); - this.parent = parent; - } - - _complete() { - this.parent._innerComplete(); - } -} diff --git a/src/operators/merge.ts b/src/operators/merge.ts index ef7f77f62e..a598765da4 100644 --- a/src/operators/merge.ts +++ b/src/operators/merge.ts @@ -1,7 +1,8 @@ import Observable from '../Observable'; import mergeStatic from './merge-static'; +import Scheduler from '../Scheduler'; -export default function merge(...observables: (Observable|number)[]): Observable { +export default function merge(...observables: (Observable|Scheduler|number)[]): Observable { observables.unshift(this); return mergeStatic.apply(this, observables); } \ No newline at end of file diff --git a/src/operators/mergeAll-support.ts b/src/operators/mergeAll-support.ts new file mode 100644 index 0000000000..27b83821ab --- /dev/null +++ b/src/operators/mergeAll-support.ts @@ -0,0 +1,68 @@ +import Observable from '../Observable'; +import Operator from '../Operator'; +import Subscriber from '../Subscriber'; +import Observer from '../Observer'; +import Subscription from '../Subscription'; + +export class MergeAllOperator implements Operator { + constructor(private concurrent: number) { + + } + + call(observer: Observer) { + return new MergeAllSubscriber(observer, this.concurrent); + } +} + +export class MergeAllSubscriber extends Subscriber { + private hasCompleted: boolean = false; + private buffer: Observable[] = []; + private active: number = 0; + constructor(destination: Observer, private concurrent:number) { + super(destination); + } + + _next(observable: any) { + if(this.active < this.concurrent) { + if(observable._isScalar) { + this.destination.next(observable.value); + } else { + const innerSub = new Subscription(); + this.add(innerSub); + this.active++; + innerSub.add(observable.subscribe(new MergeAllInnerSubscriber(this.destination, this, innerSub))); + } + } else { + this.buffer.push(observable); + } + } + + _complete() { + this.hasCompleted = true; + if(this.active === 0 && this.buffer.length === 0) { + this.destination.complete(); + } + } + + notifyComplete(innerSub: Subscription) { + const buffer = this.buffer; + this.remove(innerSub); + this.active--; + if(buffer.length > 0) { + this._next(buffer.shift()); + } else if (this.active === 0 && this.hasCompleted) { + this.destination.complete(); + } + } +} + +export class MergeAllInnerSubscriber extends Subscriber { + constructor(destination: Observer, private parent: MergeAllSubscriber, + private innerSub: Subscription ) { + super(destination); + } + + _complete() { + this.parent.notifyComplete(this.innerSub); + } +} \ No newline at end of file diff --git a/src/operators/mergeAll.ts b/src/operators/mergeAll.ts index b45e47427f..79290c8cb2 100644 --- a/src/operators/mergeAll.ts +++ b/src/operators/mergeAll.ts @@ -3,66 +3,8 @@ import Operator from '../Operator'; import Subscriber from '../Subscriber'; import Observer from '../Observer'; import Subscription from '../Subscription'; +import { MergeAllOperator } from './mergeAll-support'; export default function mergeAll(concurrent: number = Number.POSITIVE_INFINITY): Observable { return this.lift(new MergeAllOperator(concurrent)); -} - -class MergeAllOperator implements Operator { - constructor(private concurrent: number) { - - } - - call(observer: Observer) { - return new MergeAllSubscriber(observer, this.concurrent); - } -} - -class MergeAllSubscriber extends Subscriber { - private hasCompleted: boolean = false; - private buffer: Observable[] = []; - private active: number = 0; - constructor(destination: Observer, private concurrent:number) { - super(destination); - } - - _next(value: any) { - if(this.active < this.concurrent) { - const innerSub = new Subscription(); - this.add(innerSub); - this.active++; - innerSub.add(value.subscribe(new MergeAllInnerSubscriber(this.destination, this, innerSub))); - } else { - this.buffer.push(value); - } - } - - _complete() { - this.hasCompleted = true; - if(this.active === 0 && this.buffer.length === 0) { - this.destination.complete(); - } - } - - notifyComplete(innerSub: Subscription) { - const buffer = this.buffer; - this.remove(innerSub); - this.active--; - if(buffer.length > 0) { - this._next(buffer.shift()); - } else if (this.active === 0 && this.hasCompleted) { - this.destination.complete(); - } - } -} - -class MergeAllInnerSubscriber extends Subscriber { - constructor(destination: Observer, private parent: MergeAllSubscriber, - private innerSub: Subscription ) { - super(destination); - } - - _complete() { - this.parent.notifyComplete(this.innerSub); - } } \ No newline at end of file