diff --git a/spec/operators/buffer-spec.ts b/spec/operators/buffer-spec.ts index 1f933d8360..a4232193ff 100644 --- a/spec/operators/buffer-spec.ts +++ b/spec/operators/buffer-spec.ts @@ -1,3 +1,4 @@ +/** @prettier */ import { buffer, mergeMap, take } from 'rxjs/operators'; import { EMPTY, NEVER, throwError, of, Subject } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; @@ -20,7 +21,7 @@ describe('Observable.prototype.buffer', () => { const expectedValues = { x: ['a', 'b', 'c'], y: ['d', 'e', 'f'], - z: ['g', 'h', 'i'] + z: ['g', 'h', 'i'], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues); }); @@ -48,7 +49,7 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ hot, expectObservable }) => { const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); const b = EMPTY; - const expected = '|'; + const expected = ' --------------------------------|'; expectObservable(a.pipe(buffer(b))).toBe(expected); }); }); @@ -66,7 +67,7 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ expectObservable }) => { const a = NEVER; const b = EMPTY; - const expected = '|'; + const expected = '-'; expectObservable(a.pipe(buffer(b))).toBe(expected); }); }); @@ -128,7 +129,7 @@ describe('Observable.prototype.buffer', () => { c: ['6'], d: [] as string[], e: ['7', '8', '9'], - f: ['0'] + f: ['0'], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues); }); @@ -138,14 +139,14 @@ describe('Observable.prototype.buffer', () => { // Buffshoulder Boundaries onCompletedBoundaries (RxJS 4) testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); - const subs = ' ^----------------! '; + const subs = ' ^-------------------------------!'; const b = hot('--------^--a-------b---cd| '); - const expected = ' ---a-------b---cd| '; + const expected = ' ---a-------b---cd---------------|'; const expectedValues = { a: ['3'], b: ['4', '5'], c: ['6'], - d: [] as string[] + d: [] as string[], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues); expectSubscriptions(a.subscriptions).toBe(subs); @@ -161,7 +162,7 @@ describe('Observable.prototype.buffer', () => { const expected = ' ---a-------b--- '; const expectedValues = { a: ['3'], - b: ['4', '5'] + b: ['4', '5'], }; expectObservable(a.pipe(buffer(b)), unsub).toBe(expected, expectedValues); expectSubscriptions(a.subscriptions).toBe(subs); @@ -177,13 +178,13 @@ describe('Observable.prototype.buffer', () => { const unsub = ' --------------! '; const expectedValues = { a: ['3'], - b: ['4', '5'] + b: ['4', '5'], }; const result = a.pipe( mergeMap((x: any) => of(x)), buffer(b), - mergeMap((x: any) => of(x)), + mergeMap((x: any) => of(x)) ); expectObservable(result, unsub).toBe(expected, expectedValues); @@ -194,13 +195,13 @@ describe('Observable.prototype.buffer', () => { it('should work with non-empty and selector error', () => { // Buffer Boundaries onErrorSource (RxJS 4) testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { - const a = hot('--1--2--^--3-----#', {'3': 3}, new Error('too bad')); + const a = hot('--1--2--^--3-----#', { '3': 3 }, new Error('too bad')); const subs = ' ^--------!'; const b = hot('--------^--a--b---'); const expected = ' ---a--b--#'; const expectedValues = { a: [3], - b: [] as string[] + b: [] as string[], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues, new Error('too bad')); expectSubscriptions(a.subscriptions).toBe(subs); @@ -227,7 +228,7 @@ describe('Observable.prototype.buffer', () => { const expectedValues = { a: ['3'], b: ['4', '5'], - c: ['6'] + c: ['6'], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues, new Error('too bad')); expectSubscriptions(a.subscriptions).toBe(subs); @@ -244,7 +245,7 @@ describe('Observable.prototype.buffer', () => { const expected = ' ---a-------b--- '; const expectedValues = { a: ['3'], - b: ['4', '5'] + b: ['4', '5'], }; expectObservable(a.pipe(buffer(b)), unsub).toBe(expected, expectedValues); @@ -272,11 +273,9 @@ describe('Observable.prototype.buffer', () => { const results: any[] = []; const subject = new Subject(); - const source = subject.pipe( - buffer(subject) - ).subscribe({ - next: value => results.push(value), - complete: () => results.push('complete') + const source = subject.pipe(buffer(subject)).subscribe({ + next: (value) => results.push(value), + complete: () => results.push('complete'), }); subject.next(1); diff --git a/src/internal/operators/buffer.ts b/src/internal/operators/buffer.ts index 56e4015ce0..a9deb9ae81 100644 --- a/src/internal/operators/buffer.ts +++ b/src/internal/operators/buffer.ts @@ -1,6 +1,7 @@ import { Observable } from '../Observable'; import { OperatorFunction } from '../types'; import { operate } from '../util/lift'; +import { noop } from '../util/noop'; import { OperatorSubscriber } from './OperatorSubscriber'; /** @@ -50,12 +51,19 @@ export function buffer(closingNotifier: Observable): OperatorFunction { - // Start a new buffer and emit the previous one. - const b = currentBuffer; - currentBuffer = []; - subscriber.next(b); - }) + new OperatorSubscriber( + subscriber, + () => { + // Start a new buffer and emit the previous one. + const b = currentBuffer; + currentBuffer = []; + subscriber.next(b); + }, + // Pass all errors to the consumer. + undefined, + // Closing notifier should not complete the resulting observable. + noop + ) ); return () => {