Skip to content

Commit

Permalink
fix(buffer): closingNotifier completion does not complete resulting o…
Browse files Browse the repository at this point in the history
…bservable

Resolves an issue where the resulting observable would complete when the closingNotifier completed. Notifier completion should not complete the result, only source completion should do that.

BREAKING CHANGE: closingNotifier no longer closes the result of `buffer`. If that is truly a desired behavior, then you should use `takeUntil`. Something like: `source$.pipe(buffer(notifier$), takeUntil(notifier$.pipe(ignoreElements(), endWith(true))))`, where `notifier$` is multicast, although there are many ways to compose this behavior.
  • Loading branch information
benlesh committed Feb 22, 2021
1 parent 1aa400a commit 13115c4
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 25 deletions.
37 changes: 18 additions & 19 deletions spec/operators/buffer-spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/** @prettier */
import { buffer, mergeMap, take } from 'rxjs/operators';
import { EMPTY, NEVER, throwError, of, Subject } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';
Expand All @@ -20,7 +21,7 @@ describe('Observable.prototype.buffer', () => {
const expectedValues = {
x: ['a', 'b', 'c'],
y: ['d', 'e', 'f'],
z: ['g', 'h', 'i']
z: ['g', 'h', 'i'],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues);
});
Expand Down Expand Up @@ -48,7 +49,7 @@ describe('Observable.prototype.buffer', () => {
testScheduler.run(({ hot, expectObservable }) => {
const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|');
const b = EMPTY;
const expected = '|';
const expected = ' --------------------------------|';
expectObservable(a.pipe(buffer(b))).toBe(expected);
});
});
Expand All @@ -66,7 +67,7 @@ describe('Observable.prototype.buffer', () => {
testScheduler.run(({ expectObservable }) => {
const a = NEVER;
const b = EMPTY;
const expected = '|';
const expected = '-';
expectObservable(a.pipe(buffer(b))).toBe(expected);
});
});
Expand Down Expand Up @@ -128,7 +129,7 @@ describe('Observable.prototype.buffer', () => {
c: ['6'],
d: [] as string[],
e: ['7', '8', '9'],
f: ['0']
f: ['0'],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues);
});
Expand All @@ -138,14 +139,14 @@ describe('Observable.prototype.buffer', () => {
// Buffshoulder Boundaries onCompletedBoundaries (RxJS 4)
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|');
const subs = ' ^----------------! ';
const subs = ' ^-------------------------------!';
const b = hot('--------^--a-------b---cd| ');
const expected = ' ---a-------b---cd| ';
const expected = ' ---a-------b---cd---------------|';
const expectedValues = {
a: ['3'],
b: ['4', '5'],
c: ['6'],
d: [] as string[]
d: [] as string[],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues);
expectSubscriptions(a.subscriptions).toBe(subs);
Expand All @@ -161,7 +162,7 @@ describe('Observable.prototype.buffer', () => {
const expected = ' ---a-------b--- ';
const expectedValues = {
a: ['3'],
b: ['4', '5']
b: ['4', '5'],
};
expectObservable(a.pipe(buffer(b)), unsub).toBe(expected, expectedValues);
expectSubscriptions(a.subscriptions).toBe(subs);
Expand All @@ -177,13 +178,13 @@ describe('Observable.prototype.buffer', () => {
const unsub = ' --------------! ';
const expectedValues = {
a: ['3'],
b: ['4', '5']
b: ['4', '5'],
};

const result = a.pipe(
mergeMap((x: any) => of(x)),
buffer(b),
mergeMap((x: any) => of(x)),
mergeMap((x: any) => of(x))
);

expectObservable(result, unsub).toBe(expected, expectedValues);
Expand All @@ -194,13 +195,13 @@ describe('Observable.prototype.buffer', () => {
it('should work with non-empty and selector error', () => {
// Buffer Boundaries onErrorSource (RxJS 4)
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const a = hot('--1--2--^--3-----#', {'3': 3}, new Error('too bad'));
const a = hot('--1--2--^--3-----#', { '3': 3 }, new Error('too bad'));
const subs = ' ^--------!';
const b = hot('--------^--a--b---');
const expected = ' ---a--b--#';
const expectedValues = {
a: [3],
b: [] as string[]
b: [] as string[],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues, new Error('too bad'));
expectSubscriptions(a.subscriptions).toBe(subs);
Expand All @@ -227,7 +228,7 @@ describe('Observable.prototype.buffer', () => {
const expectedValues = {
a: ['3'],
b: ['4', '5'],
c: ['6']
c: ['6'],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues, new Error('too bad'));
expectSubscriptions(a.subscriptions).toBe(subs);
Expand All @@ -244,7 +245,7 @@ describe('Observable.prototype.buffer', () => {
const expected = ' ---a-------b--- ';
const expectedValues = {
a: ['3'],
b: ['4', '5']
b: ['4', '5'],
};

expectObservable(a.pipe(buffer(b)), unsub).toBe(expected, expectedValues);
Expand Down Expand Up @@ -272,11 +273,9 @@ describe('Observable.prototype.buffer', () => {
const results: any[] = [];
const subject = new Subject<number>();

const source = subject.pipe(
buffer(subject)
).subscribe({
next: value => results.push(value),
complete: () => results.push('complete')
const source = subject.pipe(buffer(subject)).subscribe({
next: (value) => results.push(value),
complete: () => results.push('complete'),
});

subject.next(1);
Expand Down
20 changes: 14 additions & 6 deletions src/internal/operators/buffer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Observable } from '../Observable';
import { OperatorFunction } from '../types';
import { operate } from '../util/lift';
import { noop } from '../util/noop';
import { OperatorSubscriber } from './OperatorSubscriber';

/**
Expand Down Expand Up @@ -50,12 +51,19 @@ export function buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T,

// Subscribe to the closing notifier.
closingNotifier.subscribe(
new OperatorSubscriber(subscriber, () => {
// Start a new buffer and emit the previous one.
const b = currentBuffer;
currentBuffer = [];
subscriber.next(b);
})
new OperatorSubscriber(
subscriber,
() => {
// Start a new buffer and emit the previous one.
const b = currentBuffer;
currentBuffer = [];
subscriber.next(b);
},
// Pass all errors to the consumer.
undefined,
// Closing notifier should not complete the resulting observable.
noop
)
);

return () => {
Expand Down

0 comments on commit 13115c4

Please sign in to comment.