Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Request: Seamless stream and/or async iterable support #110

Open
ronag opened this issue Feb 17, 2021 · 11 comments
Open

Feature Request: Seamless stream and/or async iterable support #110

ronag opened this issue Feb 17, 2021 · 11 comments
Assignees
Labels
enhancement New feature or request
Milestone

Comments

@ronag
Copy link
Collaborator

ronag commented Feb 17, 2021

Would be nice to be able to provide a stream and/or async iterable (of buffers) as argument and have is seamlessly (using transferable) accessible in the worker.

e.g.

const handle = await fs.open(dst)
try {
  for await (const buf of worker.runTask(fs.createReadStream(src))) {
     await handle.write(buf)
  }
} finally {
  await handle.close()
}
module.exports = async function * (source) {
  for await (const buf of source) {
    yield veryExpensiveProcessing(buf)
  }
}
@ronag ronag changed the title Feature Request: seamless passing of streams as argument Feature Request: Seamless stream and/or async iterable support Feb 17, 2021
@ronag
Copy link
Collaborator Author

ronag commented Feb 17, 2021

maybe duplicate of #108

@ronag
Copy link
Collaborator Author

ronag commented Feb 17, 2021

@mcollina This would make it possible to do e.g.

await pipeline(piscina.move(src), piscina.transform(workerPath), dst)

@jasnell
Copy link
Collaborator

jasnell commented Feb 17, 2021

Unfortunately we can't make streams seamlessly transferable/cloneable using move. The only objects that can be made transferable are core objects that extend from BaseObject. It would be possible to create a Transform implementation whose _transform dispatched to Piscina and continued on after each task was done. So something like..

const piscina = new Piscina({ /** ... **/ });
const workerTransform = new MyTransformer(piscina);
await pipeline(src, workerTransform, dst);

I have been thinking about adding a new type of Stream in core that complements MessagePort to implement a highly efficient cross-thread stream model. Basically something like:

const { MessageDuplex } = require('worker_threads')

const mc = new MessageChannel();

mc.port1.onmessage = async ({data}) => {
  for await (const chunk of data)
    data.write(transformIt(chunk));
};

const d = new MessageDuplex();
d.on('data', console.log);

mc.port2.postMessage(d);

d.write('hello');
d.end('there');

The idea here is that MessageDuplex would use highly efficient signaling and no-copy mechanisms to exchange data with the paired clone. It would get very close to what you're looking for here.

bottom line is that the limitations of cloning/transfering over MessagePort definitely impose some restrictions.

@jasnell
Copy link
Collaborator

jasnell commented Feb 17, 2021

Regarding this pattern.... I'm not sure this (an async generator as a worker) is a pattern that could work with the worker pool. Retaining the generator state would be problematic. Not necessarily impossible but extremely complicated. Will have to think about how we could do it and what the semantics would actually be.

module.exports = async function * (source) {
  for await (const buf of source) {
    yield veryExpensiveProcessing(buf)
  }
}

@ronag
Copy link
Collaborator Author

ronag commented Feb 17, 2021

I have been thinking about adding a new type of Stream in core that complements MessagePort to implement a highly efficient cross-thread stream model. Basically something like:

I think all of that could be hidden away in a piscina.pipeline() helper, no?

@jasnell
Copy link
Collaborator

jasnell commented Feb 17, 2021

Possibly, but we still hit up against limitations on MessagePort. Lemme stew over it a bit

@jasnell jasnell added the enhancement New feature or request label May 15, 2021
@jasnell
Copy link
Collaborator

jasnell commented Dec 11, 2021

#170 illustrates using transferable whatwg readable and writable streams to an individual worker. It doesn't quite do what this issue is suggesting but it definitely gets a bit closer!

Copy link

github-actions bot commented Jun 2, 2024

This issue has been marked as stale because it has been opened 30 days without activity. Remove stale label or comment or this will be closed in 5 days.

@github-actions github-actions bot added the stale label Jun 2, 2024
@metcoder95 metcoder95 removed the stale label Jun 6, 2024
Copy link

github-actions bot commented Jul 7, 2024

This issue has been marked as stale because it has been opened 30 days without activity. Remove stale label or comment or this will be closed in 5 days.

@github-actions github-actions bot added the stale label Jul 7, 2024
@metcoder95 metcoder95 removed the stale label Jul 9, 2024
Copy link

This issue has been marked as stale because it has been opened 30 days without activity. Remove stale label or comment or this will be closed in 5 days.

@github-actions github-actions bot added the stale label Aug 11, 2024
Copy link

This issue was closed because it has been stalled for 5 days with no activity.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Aug 18, 2024
@metcoder95 metcoder95 reopened this Oct 30, 2024
@metcoder95 metcoder95 added never-stale Issues/PRs that are not stale and removed stale labels Oct 30, 2024
@metcoder95 metcoder95 added this to the v5 milestone Nov 1, 2024
@metcoder95 metcoder95 self-assigned this Nov 1, 2024
@metcoder95 metcoder95 removed the never-stale Issues/PRs that are not stale label Dec 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants