-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Possible regression in buffer
between 7.0.0-beta.4 and 7.0.0-beta.5
#6035
Comments
I've created this code-sandbox to better understand what's happening. This is its code: import { Subject, timer } from "rxjs";
import { buffer } from "rxjs/operators";
console.clear();
const subject = new Subject<string>();
subject.pipe(buffer(timer(0))).subscribe(
(x) => {
console.log("value received", x);
},
(e) => {
console.log("error", e);
},
() => {
console.log("done!");
}
);
subject.next("a");
subject.complete();
// Promise.resolve().then(() => subject.complete());
/*
setTimeout(() => {
subject.complete();
}, 0);
*/ Prior to In This seems to indicate that since IMO this change in behavior that we got on v7.0.0-beta.5 makes sense and it is consistent with what happens on v6 and v7 when the source completes after having emitted a value, if in that moment the |
How would you explain this failing test (fails in beta.5, passes in beta.4)? (Maybe I should have started with this one.) it(
"works with 0 timeout",
marbles((m) => {
const source$ = m.cold("--(ab)--c--(de|)");
const ms = m.time(" |");
const expected = " --1-----2--(3|)";
const actual$ = source$.pipe(bufferDebounce(ms));
m.expect(actual$).toBeObservable(expected, {
1: ["a", "b"],
2: ["c"],
3: ["d", "e"],
});
})
); Fails with:
(You can clone the repo and run the tests using Here's the same example but reduced a bit more: https://codesandbox.io/s/buffer-regression-forked-vdu09?file=/src/index.ts import { Subject } from "rxjs";
import { buffer, debounceTime } from "rxjs/operators";
console.clear();
const subject = new Subject<string>();
subject.pipe(buffer(subject.pipe(debounceTime(0)))).subscribe(
(x) => {
console.log("value received", x);
},
(e) => {
console.log("error", e);
},
() => {
console.log("done!");
}
);
subject.next("a");
subject.next("b");
setTimeout(() => {
subject.next("c");
}, 1000);
setTimeout(() => {
subject.next("d");
subject.next("e");
subject.complete();
}, 2000); v7.0.0-beta.4 and below: v7.0.0-beta.5 and up: It doesn't seem intuitive behaviour to me that |
I think I'm coming round to your point of view. We can use const subject = new Subject<string>();
const source = subject.pipe(
prioritize((first, second) => {
return second.pipe(buffer(first.pipe(debounceTime(0))));
})
); https://codesandbox.io/s/buffer-regression-forked-nh04w?file=/src/index.ts and import * as RxJS from "rxjs";
import * as RxJSOperators from "rxjs/operators";
import { prioritize } from "rxjs-etc/operators";
export const bufferDebounce = (ms: number) => <T>(
source: RxJS.Observable<T>
): RxJS.Observable<T[]> =>
source.pipe(
prioritize((first, second) =>
second.pipe(
RxJSOperators.buffer(first.pipe(RxJSOperators.debounceTime(ms)))
)
)
); https://github.com/OliverJAsh/rxjs-v7-buffer-regression/blob/fix/bufferDebounce.ts This fixes the tests which were failing before. It might be worth listing this change as a potential breaking change in the v7 changelog, as it broke a few things in our app quite significantly (we use |
FWIW I do think that having the resulting observable emit any buffered values upon the completion of the source would be a very reasonable behavior to expect from the |
Agreed. I would be happy to send a PR but first it would be good to get input from a few other maintainers, just to make sure I do the right thing in the PR!
I can send a PR for this too. Should I just add a breaking change to the existing v7.0.0-beta.5 entry in For future reference, I created an even simpler test case for this breaking change. This test passes in v6 and v7.0.0-beta.4 but fails in v7.0.0-beta.5 and above: const results: any[] = [];
const subject = new Subject<number>();
const source = subject.pipe(buffer(subject.pipe(debounceTime(0)))).subscribe({
next: (value) => results.push(value),
complete: () => results.push("complete")
});
subject.next(1);
expect(results).to.deep.equal([]);
subject.complete();
expect(results).to.deep.equal([[1], 'complete']); https://codesandbox.io/s/buffer-regression-forked-kjdwc?file=/src/index.ts |
I'll re-open this so we don't forget to follow up on the above action points. |
This is actually related to an old bug here: #3990 We should always be emitting whatever is in the buffer when it closes. I suspect their will be complaints about occasional empty buffers at completion, but those are easily filtered. |
…ingNotifier` active Gives the author control over the emission of the final buffer. If the `closingNotifier` completes before the source does, no more buffers will be emitted. If the `closingNotifier` is still active when the source completes, then whatever is in the buffer at the time will be emitted from the resulting observable before it completes. BREAKING CHANGE: Final buffered values will now be emitted from the resulting observable if the `closingNotifier` is still active. The simplest workaround if you want the original behavior (where you possibly miss values), is to add a `skipLast(1)` at the end. Otherwise, you can try to complete the `closingNotifier` prior to the completion of the source. Fixes ReactiveX#3990, ReactiveX#6035
…ingNotifier` active Gives the author control over the emission of the final buffer. If the `closingNotifier` completes before the source does, no more buffers will be emitted. If the `closingNotifier` is still active when the source completes, then whatever is in the buffer at the time will be emitted from the resulting observable before it completes. BREAKING CHANGE: Final buffered values will now be emitted from the resulting observable if the `closingNotifier` is still active. The simplest workaround if you want the original behavior (where you possibly miss values), is to add a `skipLast(1)` at the end. Otherwise, you can try to complete the `closingNotifier` prior to the completion of the source. Fixes ReactiveX#3990, ReactiveX#6035
…ingNotifier` active Gives the author control over the emission of the final buffer. If the `closingNotifier` completes before the source does, no more buffers will be emitted. If the `closingNotifier` is still active when the source completes, then whatever is in the buffer at the time will be emitted from the resulting observable before it completes. BREAKING CHANGE: Final buffered values will now be emitted from the resulting observable if the `closingNotifier` is still active. The simplest workaround if you want the original behavior (where you possibly miss values), is to add a `skipLast(1)` at the end. Otherwise, you can try to complete the `closingNotifier` prior to the completion of the source. Fixes ReactiveX#3990, ReactiveX#6035
BREAKING CHANGE: Final buffered values will now always be emitted. To get the same behavior as the previous release, you can use `endWith` and `skipLast(1)`, like so: `source$.pipe(buffer(notifier$.pipe(endWith(true))), skipLast(1))` Fixes ReactiveX#3990, ReactiveX#6035
Closed by #6042 (IIUC) |
Bug Report
Current Behavior
Reduced test case: https://github.com/OliverJAsh/rxjs-v7-buffer-regression
I have the following operator:
https://github.com/OliverJAsh/rxjs-v7-buffer-regression/blob/42be6f30c5afa433abebd9d98f09b9da15d3efb2/bufferDebounce.ts#L1-L19
This test passes in v6 and v7.0.0-beta.4 but fails in v7.0.0-beta.5 and above:
https://github.com/OliverJAsh/rxjs-v7-buffer-regression/blob/42be6f30c5afa433abebd9d98f09b9da15d3efb2/test.js#L53-L63
The test fails with:
I believe this is due to #5654 but I'm not sure if this is intentional.
Expected behavior
The test passes like it did before v7.0.0-beta.5.
Reproduction
See above
The text was updated successfully, but these errors were encountered: