Skip to content

Commit

Permalink
fix(bufferToggle): don't signal on complete
Browse files Browse the repository at this point in the history
BREAKING CHANGE: the observable returned by the bufferToggle operator's
closing selector must emit a next notification to close the buffer.
Complete notifications no longer close the buffer.
  • Loading branch information
cartant authored and benlesh committed Nov 3, 2020
1 parent a2ba364 commit 65686ff
Showing 2 changed files with 64 additions and 58 deletions.
115 changes: 62 additions & 53 deletions spec/operators/bufferToggle-spec.ts
Original file line number Diff line number Diff line change
@@ -58,20 +58,20 @@ describe('bufferToggle operator', () => {

it('should emit buffers using varying cold closings', () => {
testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => {
const e1 = hot('--a--^---b---c---d---e---f---g---h------| ');
const e2 = cold(' --x-----------y--------z---| ');
const subs = ' ^----------------------------------! ';
const e1 = hot('--a--^---b---c---d---e---f---g---h------| ');
const e2 = cold(' --x-----------y--------z---| ');
const subs = ' ^----------------------------------! ';
const closings = [
cold(' ---------------s--| '),
cold(' ----(s|) '),
cold(' ---------------(s|)')
cold(' ---------------s--| '),
cold(' ----(s|) '),
cold(' ---------------(s|)')
];
const closeSubs = [
' --^--------------! ',
' --------------^---! ',
' -----------------------^-----------! '
' --^--------------! ',
' --------------^---! ',
' -----------------------^-----------! '
];
const expected = '-----------------ij----------------(k|) ';
const expected = ' -----------------ij----------------(k|) ';
const values = {
i: ['b', 'c', 'd', 'e'],
j: ['e'],
@@ -100,11 +100,11 @@ describe('bufferToggle operator', () => {
sub: ' --^--------------! '
},
{
obs: hot(' -----3----4-------(s|) '),
obs: hot(' -----3----4-------(s|) '),
sub: ' --------------^---! '
},
{
obs: hot(' -------3----4-------5----------------s| '),
obs: hot(' -------3----4-------5----------------s|'),
sub: ' -----------------------^-----------! '
}
];
@@ -129,18 +129,18 @@ describe('bufferToggle operator', () => {

it('should emit buffers using varying empty delayed closings', () => {
testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => {
const e1 = hot('--a--^---b---c---d---e---f---g---h------| ');
const e2 = cold(' --x-----------y--------z---| ');
const subs = ' ^----------------------------------! ';
const e1 = hot('--a--^---b---c---d---e---f---g---h------| ');
const e2 = cold(' --x-----------y--------z---| ');
const subs = ' ^----------------------------------! ';
const closings = [
cold(' ---------------| '),
cold(' ----| '),
cold(' ---------------| ')
cold(' ---------------| '),
cold(' ----| '),
cold(' ---------------| ')
];
const expected = ' -----------------ij----------------(k|)';
const expected = ' -----------------------------------(ijk|)';
const values = {
i: ['b', 'c', 'd', 'e'],
j: ['e'],
i: ['b', 'c', 'd', 'e', 'f', 'g', 'h'],
j: ['e', 'f', 'g', 'h'],
k: ['g', 'h']
};

@@ -158,9 +158,9 @@ describe('bufferToggle operator', () => {
const subs = ' ^---------! ';
const e2 = cold(' --x-----------y--------z---| ');
const closings = [
cold(' ---------------s--| '),
cold(' ----(s|) '),
cold(' ---------------(s|) ')
cold(' ---------------s--| '),
cold(' ----(s|) '),
cold(' ---------------(s|)')
];
const csub0 = ' --^-------! ';
const expected = ' ----------- ';
@@ -182,16 +182,16 @@ describe('bufferToggle operator', () => {

it('should not break unsubscription chains when result is unsubscribed explicitly', () => {
testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => {
const e1 = hot('--a--^---b---c---d---e---f---g---h------| ');
const subs = ' ^-----------------! ';
const e2 = cold(' --x-----------y--------z---| ');
const e1 = hot('--a--^---b---c---d---e---f---g---h------| ');
const subs = ' ^-----------------! ';
const e2 = cold(' --x-----------y--------z---| ');
const closings = [
cold(' ---------------s--| '),
cold(' ----(s|) '),
cold(' ---------------(s|)')
cold(' ---------------s--| '),
cold(' ----(s|) '),
cold(' ---------------(s|)')
];
const expected = ' -----------------i- ';
const unsub = ' ------------------! ';
const expected = ' -----------------i- ';
const unsub = ' ------------------! ';
const values = {
i: ['b', 'c', 'd', 'e']
};
@@ -210,16 +210,16 @@ describe('bufferToggle operator', () => {

it('should propagate error thrown from closingSelector', () => {
testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => {
const e1 = hot('--a--^---b---c---d---e---f---g---h------| ');
const e2 = cold(' --x-----------y--------z---| ');
const subs = ' ^-------------! ';
const e1 = hot('--a--^---b---c---d---e---f---g---h------| ');
const e2 = cold(' --x-----------y--------z---| ');
const subs = ' ^-------------! ';
const closings = [
cold(' ---------------s--| '),
cold(' ----(s|) '),
cold(' ---------------(s|)')
cold(' ---------------s--| '),
cold(' ----(s|) '),
cold(' ---------------(s|)')
];
const closeSubs0 = '--^-----------! ';
const expected = '--------------# ';
const closeSubs0 = ' --^-----------! ';
const expected = ' --------------# ';

let i = 0;
const result = e1.pipe(
@@ -245,10 +245,10 @@ describe('bufferToggle operator', () => {
const e2 = cold(' --x-----------y--------z---| ');
const subs = ' ^-------------! ';
const closings = [
cold(' ---------------s--| '),
cold(' # ')
cold(' ---------------s--| '),
cold(' # ')
];
const closeSubs = [
const closeSubs = [
' --^-----------! ',
' --------------(^!) '
];
@@ -270,8 +270,8 @@ describe('bufferToggle operator', () => {
const e2 = cold(' --x-----------y--------z---| ');
const subs = ' ^------------------! ';
const closings = [
cold(' ---------------s--| '),
cold(' -----# ')
cold(' ---------------s--| '),
cold(' -----# ')
];
const closeSubs = [
' --^--------------! ',
@@ -298,10 +298,10 @@ describe('bufferToggle operator', () => {
const e2 = cold(' --x-----------y--------z---|');
const subs = ' ^------------------! ';
const closings = [
cold(' ---------------s--| '),
cold(' -------s| ')
cold(' ---------------s--| '),
cold(' -------s| ')
];
const closeSubs = [
const closeSubs = [
' --^--------------! ',
' --------------^----! '
];
@@ -353,6 +353,10 @@ describe('bufferToggle operator', () => {
const e1 = hot(' -');
const e2 = cold(' --o-----o------o-----o---o-----|');
const e3 = cold(' --c-|');
// --c-|
// --c-|
// --c-|
// --c-|
const unsub = ' --------------------------------------------!';
const subs = ' ^-------------------------------------------!';
const expected = '----x-----x------x-----x---x-----------------';
@@ -500,14 +504,19 @@ describe('bufferToggle operator', () => {

it('should handle empty closing observable', () => {
testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => {
const e1 = hot('--a--^---b---c---d---e---f---g---h------|');
const subs = ' ^----------------------------------!';
const e2 = cold(' --x-----------y--------z---| ');
const expected = ' --l-----------m--------n-----------|';
const e1 = hot('--a--^---b---c---d---e---f---g---h------| ');
const subs = ' ^----------------------------------! ';
const e2 = cold(' --x-----------y--------z---| ');
const expected = ' -----------------------------------(ijk|)';
const values = {
i: ['b', 'c', 'd', 'e', 'f', 'g', 'h'],
j: ['e', 'f', 'g', 'h'],
k: ['g', 'h']
};

const result = e1.pipe(bufferToggle(e2, () => EMPTY));

expectObservable(result).toBe(expected, {l: [], m: [], n: []});
expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
});
7 changes: 2 additions & 5 deletions src/internal/operators/bufferToggle.ts
Original file line number Diff line number Diff line change
@@ -69,18 +69,15 @@ export function bufferToggle<T, O>(
// when the closing notifier emits, we can tear it down.
const closingSubscription = new Subscription();

// This is captured here, because we emit on both next or
// if the closing notifier completes without value.
// TODO: We probably want to not have closing notifiers emit!!
const emit = () => {
const emitBuffer = () => {
arrRemove(buffers, buffer);
subscriber.next(buffer);
closingSubscription.unsubscribe();
};

// The line below will add the subscription to the parent subscriber *and* the closing subscription.
closingSubscription.add(
innerFrom(closingSelector(openValue)).subscribe(new OperatorSubscriber(subscriber, emit, undefined, emit))
innerFrom(closingSelector(openValue)).subscribe(new OperatorSubscriber(subscriber, emitBuffer, undefined, noop))
);
},
undefined,

0 comments on commit 65686ff

Please sign in to comment.