Skip to content

Commit

Permalink
fix(groupBy): fix groupBy to use lift(), supporting composability
Browse files Browse the repository at this point in the history
Fix bug ReactiveX#1085 in which groupBy was not using lift(). Using lift() is critical to support the
ubiquitous lift()-based architecture in RxJS Next.

Resolves ReactiveX#1085.
BREAKING CHANGES:
groupBy() now unsubscribes all inner Observables when the outer
Observable is unsubscribed. This diverges from the behavior in RxJS 4,
where inner Observables continue even if the outer is unsubscribed.
  • Loading branch information
staltz committed Jan 4, 2016
1 parent b9250c2 commit 06df9d9
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 19 deletions.
24 changes: 14 additions & 10 deletions spec/operators/groupBy-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ describe('Observable.prototype.groupBy()', function () {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should allow the outer to be unsubscribed early but inners continue', function () {
it('should unsubscribe inners when outer is unsubscribed early and explicitly', function () {
var values = {
a: ' foo',
b: ' FoO ',
Expand All @@ -435,16 +435,18 @@ describe('Observable.prototype.groupBy()', function () {
l: ' fOo '
};
var e1 = hot('-1--2--^-a-b-c-d-e-f-g-h-i-j-k-l-|', values);
var unsub = ' !';
var expected = '--w---x---';
var w = cold( 'a-b---d---------i-----l-|', values);
var x = cold( 'c-------g-h---------|', values);
var e1subs = '^ ! ';
var expected = '--w---x--- ';
var w = cold( 'a-b---d- ', values);
var x = cold( 'c--- ', values);
var unsub = ' ! ';
var expectedValues = { w: w, x: x };

var source = e1
.groupBy(function (val) { return val.toLowerCase().trim(); });

expectObservable(source, unsub).toBe(expected, expectedValues);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should allow an inner to be unsubscribed early but other inners continue', function () {
Expand Down Expand Up @@ -767,11 +769,12 @@ describe('Observable.prototype.groupBy()', function () {
l: ' fOo '
};
var e1 = hot('-1--2--^-a-b-c-d-e-f-g-h-i-j-k-l-|', values);
var unsub = ' !';
var expected = '--v---w---x-';
var v = cold( 'a-b---(d|)' , values);
var w = cold( 'c-------g-(h|)' , values);
var x = cold( 'e---------j-(k|)' , values);
var e1subs = '^ ! ';
var expected = '--v---w---x- ';
var v = cold( 'a-b---(d|) ', values);
var w = cold( 'c----- ', values);
var x = cold( 'e- ', values);
var unsub = ' ! ';
var expectedValues = { v: v, w: w, x: x };

var source = e1
Expand All @@ -782,6 +785,7 @@ describe('Observable.prototype.groupBy()', function () {
);

expectObservable(source, unsub).toBe(expected, expectedValues);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should allow using a durationSelector, outer and all inners unsubscribed early',
Expand Down
18 changes: 9 additions & 9 deletions src/operator/groupBy.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
import {Observable} from '../Observable';
import {Operator} from '../Operator';
import {Subject} from '../Subject';
import {Map} from '../util/Map';
import {FastMap} from '../util/FastMap';
Expand All @@ -10,32 +10,32 @@ import {errorObject} from '../util/errorObject';

export function groupBy<T, R>(keySelector: (value: T) => string,
elementSelector?: (value: T) => R,
durationSelector?: (grouped: GroupedObservable<R>) => Observable<any>): GroupByObservable<T, R> {
return new GroupByObservable<T, R>(this, keySelector, elementSelector, durationSelector);
durationSelector?: (grouped: GroupedObservable<R>) => Observable<any>): Observable<GroupedObservable<R>> {
return this.lift(new GroupByOperator(this, keySelector, elementSelector, durationSelector));
}

export class GroupByObservable<T, R> extends Observable<GroupedObservable<R>> {
class GroupByOperator<T, R> implements Operator<T, R> {
constructor(public source: Observable<T>,
private keySelector: (value: T) => string,
private elementSelector?: (value: T) => R,
private durationSelector?: (grouped: GroupedObservable<R>) => Observable<any>) {
super();
}

_subscribe(subscriber: Subscriber<any>): Subscription | Function | void {
call(subscriber: Subscriber<GroupedObservable<R>>): Subscriber<T> {
const refCountSubscription = new RefCountSubscription();
subscriber.add(refCountSubscription);
const groupBySubscriber = new GroupBySubscriber(
subscriber, refCountSubscription, this.keySelector, this.elementSelector, this.durationSelector
);
refCountSubscription.setPrimary(this.source.subscribe(groupBySubscriber));
return refCountSubscription;
refCountSubscription.setPrimary(groupBySubscriber);
return groupBySubscriber;
}
}

class GroupBySubscriber<T, R> extends Subscriber<T> {
private groups = null;

constructor(destination: Subscriber<R>,
constructor(destination: Subscriber<GroupedObservable<R>>,
private refCountSubscription: RefCountSubscription,
private keySelector: (value: T) => string,
private elementSelector?: (value: T) => R,
Expand Down

0 comments on commit 06df9d9

Please sign in to comment.