-
Notifications
You must be signed in to change notification settings - Fork 286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
How to spread records in a pipeline to multiple readers #4471
Comments
@nodejs/streams |
@RedYetiDev please don't ing teams for issues in the help repo :) @obones look into |
Thanks, I did not find this in the midst of information in that lib.
If I'm to start using it and it disappears in the next Node version, that would not be practical. |
You could also do it yourself: await stream.promises.pipeline(
src,
parallelOutOfOrder(fn, { concurrency }),
dst
)
class ConditionVariable {
#condition
#resume = []
constructor(condition) {
this.#condition = condition
}
notify() {
for (const resolve of this.#resume.splice(0)) {
resolve()
}
}
notifyOne() {
const resolve = this.#resume.shift()
resolve?.()
}
async wait() {
while (!this.#condition()) {
await new Promise((resolve) => {
this.#resume.push(resolve)
})
}
}
}
async function * parallelOutOfOrder (fn, { concurrency })
return (source, { signal }) => {
const queue = new PQueue({ concurrency })
const buffer = []
let error = null
let ended = false
const readController = new ConditionVariable(() => buffer.length > 0 || error || ended)
const pumpController = new ConditionVariable(() => buffer.length === 0)
const pump = async () => {
try {
for await (const value of source) {
queue
.add(async ({ signal }) => {
let ret
try {
ret = { status: 'fulfilled', value: await fn(value, { signal }) }
} catch (err) {
ret = { status: 'rejected', reason: err }
}
await pumpController.wait()
buffer.push(ret)
readController.notify()
})
await queue.onEmpty()
}
ended = true
readController.notify()
} catch (err) {
error = err
readController.notify()
}
}
pump()
while (true) {
signal?.throwIfAborted()
if (buffer.length > 0) {
yield* buffer.splice(0)
} else if (error) {
throw error
} else if (ended) {
break
} else {
pumpController.notifyOne()
await readController.wait()
}
}
}
} |
Node.js Version
v18.20.4
NPM Version
v10.7.0
Operating System
Windows/Linux
Subsystem
stream
Description
Hello,
I currently have the following simple stream pipeline:
Quite expectedly, the transform stream processes one item at a time from the source and this does what I need it to do.
The method given to transform is an
async
one as it needs toawait
a call to an HTTPs API which will take longer than the time it takes to produce the next record.In an effort to improve processing time, I would like to split the record processing across multiple consumers like so:
I have read question #2707 but this describes a case where the same record exiting a stream is duplicated across multiple downward streams but I don't want duplication, just somewhat "parallel" execution.
Is this something that is possible within the stream ecosystem?
If yes, how would you suggest that I write it?
Thanks for your help
Minimal Reproduction
No response
Output
No response
Before You Submit
The text was updated successfully, but these errors were encountered: