-
Notifications
You must be signed in to change notification settings - Fork 30.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: add abort signal for ReadableStream and WritableStream #46273
Merged
nodejs-github-bot
merged 13 commits into
nodejs:main
from
debadree25:ft/add-abort-signal-webstreams
Feb 17, 2023
Merged
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
3eaadf3
stream: add abort signal for ReadableStream and WritableStream
debadree25 45b1f4e
fixup! make onAbort const
debadree25 134d3e4
fixup! fix test assertion
debadree25 3020c01
fixup! fix assertion
debadree25 7424c76
fixup! use primordial
debadree25 85c1047
fixup! use primordial
debadree25 cbcaf77
fixup! add more tests
debadree25 89ebb9a
fixup! add doc
debadree25 1047e50
fixup! lint
debadree25 2eaae40
fixup! doc
debadree25 d9464cd
fixup! full stop
debadree25 3f2c929
fixup! lint trailing space
debadree25 6c25869
fixup! add version info
debadree25 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
'use strict'; | ||
|
||
const common = require('../common'); | ||
const { finished, addAbortSignal } = require('stream'); | ||
const { ReadableStream, WritableStream } = require('stream/web'); | ||
const assert = require('assert'); | ||
|
||
function createTestReadableStream() { | ||
return new ReadableStream({ | ||
start(controller) { | ||
controller.enqueue('a'); | ||
controller.enqueue('b'); | ||
controller.enqueue('c'); | ||
controller.close(); | ||
} | ||
}); | ||
} | ||
|
||
function createTestWritableStream(values) { | ||
return new WritableStream({ | ||
write(chunk) { | ||
values.push(chunk); | ||
} | ||
}); | ||
} | ||
|
||
{ | ||
const rs = createTestReadableStream(); | ||
|
||
const reader = rs.getReader(); | ||
|
||
const ac = new AbortController(); | ||
|
||
addAbortSignal(ac.signal, rs); | ||
|
||
finished(rs, common.mustCall((err) => { | ||
assert.strictEqual(err.name, 'AbortError'); | ||
assert.rejects(reader.read(), /AbortError/).then(common.mustCall()); | ||
assert.rejects(reader.closed, /AbortError/).then(common.mustCall()); | ||
})); | ||
|
||
reader.read().then(common.mustCall((result) => { | ||
assert.strictEqual(result.value, 'a'); | ||
ac.abort(); | ||
})); | ||
} | ||
|
||
{ | ||
const rs = createTestReadableStream(); | ||
|
||
const ac = new AbortController(); | ||
|
||
addAbortSignal(ac.signal, rs); | ||
|
||
assert.rejects((async () => { | ||
for await (const chunk of rs) { | ||
if (chunk === 'b') { | ||
ac.abort(); | ||
} | ||
} | ||
})(), /AbortError/).then(common.mustCall()); | ||
} | ||
|
||
{ | ||
const rs1 = createTestReadableStream(); | ||
|
||
const rs2 = createTestReadableStream(); | ||
|
||
const ac = new AbortController(); | ||
|
||
addAbortSignal(ac.signal, rs1); | ||
addAbortSignal(ac.signal, rs2); | ||
|
||
const reader1 = rs1.getReader(); | ||
const reader2 = rs2.getReader(); | ||
|
||
finished(rs1, common.mustCall((err) => { | ||
assert.strictEqual(err.name, 'AbortError'); | ||
assert.rejects(reader1.read(), /AbortError/).then(common.mustCall()); | ||
assert.rejects(reader1.closed, /AbortError/).then(common.mustCall()); | ||
})); | ||
|
||
finished(rs2, common.mustCall((err) => { | ||
assert.strictEqual(err.name, 'AbortError'); | ||
assert.rejects(reader2.read(), /AbortError/).then(common.mustCall()); | ||
assert.rejects(reader2.closed, /AbortError/).then(common.mustCall()); | ||
})); | ||
|
||
ac.abort(); | ||
} | ||
|
||
{ | ||
const rs = createTestReadableStream(); | ||
|
||
const { 0: rs1, 1: rs2 } = rs.tee(); | ||
|
||
const ac = new AbortController(); | ||
|
||
addAbortSignal(ac.signal, rs); | ||
|
||
const reader1 = rs1.getReader(); | ||
const reader2 = rs2.getReader(); | ||
|
||
finished(rs1, common.mustCall((err) => { | ||
assert.strictEqual(err.name, 'AbortError'); | ||
assert.rejects(reader1.read(), /AbortError/).then(common.mustCall()); | ||
assert.rejects(reader1.closed, /AbortError/).then(common.mustCall()); | ||
})); | ||
|
||
finished(rs2, common.mustCall((err) => { | ||
assert.strictEqual(err.name, 'AbortError'); | ||
assert.rejects(reader2.read(), /AbortError/).then(common.mustCall()); | ||
assert.rejects(reader2.closed, /AbortError/).then(common.mustCall()); | ||
})); | ||
|
||
ac.abort(); | ||
} | ||
|
||
{ | ||
const values = []; | ||
const ws = createTestWritableStream(values); | ||
|
||
const ac = new AbortController(); | ||
|
||
addAbortSignal(ac.signal, ws); | ||
|
||
const writer = ws.getWriter(); | ||
|
||
finished(ws, common.mustCall((err) => { | ||
assert.strictEqual(err.name, 'AbortError'); | ||
assert.deepStrictEqual(values, ['a']); | ||
assert.rejects(writer.write('b'), /AbortError/).then(common.mustCall()); | ||
assert.rejects(writer.closed, /AbortError/).then(common.mustCall()); | ||
})); | ||
|
||
writer.write('a').then(() => { | ||
ac.abort(); | ||
}); | ||
} | ||
|
||
{ | ||
const values = []; | ||
|
||
const ws1 = createTestWritableStream(values); | ||
const ws2 = createTestWritableStream(values); | ||
|
||
const ac = new AbortController(); | ||
|
||
addAbortSignal(ac.signal, ws1); | ||
addAbortSignal(ac.signal, ws2); | ||
|
||
const writer1 = ws1.getWriter(); | ||
const writer2 = ws2.getWriter(); | ||
|
||
finished(ws1, common.mustCall((err) => { | ||
assert.strictEqual(err.name, 'AbortError'); | ||
assert.rejects(writer1.write('a'), /AbortError/).then(common.mustCall()); | ||
assert.rejects(writer1.closed, /AbortError/).then(common.mustCall()); | ||
})); | ||
|
||
finished(ws2, common.mustCall((err) => { | ||
assert.strictEqual(err.name, 'AbortError'); | ||
assert.rejects(writer2.write('a'), /AbortError/).then(common.mustCall()); | ||
assert.rejects(writer2.closed, /AbortError/).then(common.mustCall()); | ||
})); | ||
|
||
ac.abort(); | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we using
Symbol.for
here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used the similar pattern to what we followed over here #46205 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aduh95 I presume Node.js streams or another part that exists both in core and userland added this for interoperability (so that
require("node:stream")
andrequire("readable-stream")
interoperate) and then everyone (probably me too) saw the code and cargo culted. Probably a bunch at the point we moved private variables to symbols when privates were not supported in snapshots back then.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also arguably easier to debug "from the outside" but not by a considerable margin.