Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async iterable Stream class that inherits from Signal #462

Merged
merged 11 commits into from
Nov 11, 2022

Conversation

afshin
Copy link
Member

@afshin afshin commented Nov 8, 2022

This PR adds a Stream class that extends Signal and implements IStream, which means that in addition to being a signal, it is also an async iterable. It can be used as a drop-in replacement for any Signal (or ISignal) in an application if the author wishes to expose an async iterable API to complement signal connection/disconnection.

This PR also exposes a protected blocked: number property of a Signal so that sub-classes can implement correct blocking behavior.

Examples

Subscribe to a stream via connect/disconnect:

// Instantiate a stream.
const stream = new Stream<unknown, string>({});

// Connect a handler to the stream.
const handler = (_, args) => console.log(`emission is: ${args}`);
stream.connect(handler);

stream.emit('foo');
// [OUTPUT] emission is: foo

stream.emit('bar');
// [OUTPUT] emission is: bar

stream.emit('baz');
// [OUTPUT] emission is: baz

// Disconnect the stream handler.
stream.disconnect(handler);

Subscribe to a stream as an async iterable:

// Instantiate a stream.
const stream = new Stream<unknown, string>({});

// Iterate over stream as an iterable.
void (async () => {
  for await (const emission of stream) {
    console.log(`emission is: ${emission}`);
  }
  console.log('stream stopped');
})();

stream.emit('foo');
// [OUTPUT] emission is: foo

stream.emit('bar');
// [OUTPUT] emission is: bar

stream.emit('baz');
// [OUTPUT] emission is: baz

stream.stop();
// [OUTPUT] stream stopped

Generate an async iterator over the stream.

// Instantiate a stream.
const stream = new Stream<unknown, string>({});

// Loop through the stream iterator.
void (async () => {
  const it = stream[Symbol.asyncIterator]();
  let emission: IteratorResult<string, any>;
  while (!(emission = await it.next()).done) {
    console.log(`emission is: ${emission.value}`);
  }
  console.log('stream stopped');
})();

stream.emit('foo');
// [OUTPUT] emission is: foo

stream.emit('bar');
// [OUTPUT] emission is: bar

stream.emit('baz');
// [OUTPUT] emission is: baz

stream.stop();
// [OUTPUT] stream stopped

@afshin afshin added the enhancement New feature or request label Nov 8, 2022
@afshin afshin added this to the Lumino 2 milestone Nov 8, 2022
@afshin afshin self-assigned this Nov 8, 2022
@afshin afshin changed the title Add Stream class that inherits from Signal and is an async iterator Add async iterable/iterator Stream class that inherits from Signal Nov 8, 2022
@afshin afshin marked this pull request as ready for review November 8, 2022 09:51
@afshin
Copy link
Member Author

afshin commented Nov 8, 2022

cc: @jasongrout @vidartf

@jasongrout
Copy link
Contributor

jasongrout commented Nov 8, 2022

Cool. If I understand it correctly, you're long-polling the signal?

@afshin
Copy link
Member Author

afshin commented Nov 8, 2022

I wouldn't have used the term long polling, but maybe it is apt, because there is always a pending promise outstanding and it is resolved every time there is an emission. Is that what you mean?

@fcollonval
Copy link
Member

Thinking a bit more about event life cycle, could you try the following scenario:

A signal listener, emit the same signal that triggers it (only the first time it get triggers to avoid infinite recursion).

When are the iterator/iterable displayed?

The goal here is to check a validation scenario for instance, in which one listener correct a model it is listening. So that the same signal will be triggered twice in a synchronous callback stack.

My guess is that the iterator/iterable will not be displayed between the two executions of the listener as they are asynchronous. If this is the case, we should warn user of potential inconsistency when using that feature as the state that triggered a promise resolution may not exist any more.

@afshin
Copy link
Member Author

afshin commented Nov 8, 2022

@fcollonval You're absolutely right. There is a problem here. I'm going to put this PR back into draft mode to think about it.

@afshin afshin marked this pull request as draft November 8, 2022 13:59
@afshin afshin changed the title Add async iterable/iterator Stream class that inherits from Signal Add async iterable Stream class that inherits from Signal Nov 9, 2022
@afshin afshin marked this pull request as ready for review November 9, 2022 10:32
@afshin
Copy link
Member Author

afshin commented Nov 9, 2022

@fcollonval I was thinking about it realized that maybe .next() is not necessary and that it is enough for Stream to be an async iterable. If you truly want an iterator, you can still call Stream#[Symbol.asyncIterator](), but this is insufficient to fix the bug, in se.

The other thing I did was to make sure you never lose a promise by having the internal pending promise always resolve to the next promise. This way no promises get lost. It's the same promise-chaining technique Poll uses.

Thanks for finding it. It hurt my head for a while 🥲

@vidartf
Copy link
Member

vidartf commented Nov 9, 2022

How does the async iterator approach interact with cleanup compared to the connect/disconnect pattern? Previously, the signal emitter would keep a reference to the slot, but with the stream approach it gets reversed, right? How will this affect resource cleanup? I haven't thought it through yet, so I'm hoping you have 😀 I.e. I can imagine this would improve the situation where listeners aren't being cleaned up since there are still references to them, but will it instead prevent emitters from being cleaned up since there are listeners who have references to them? Note: I might be mixing up how JS resource management works these days.

@afshin
Copy link
Member Author

afshin commented Nov 9, 2022

@vidartf I think this will improve the situation, as you say, because when you use a Stream via its async iterable API, you're in an infinite loop where each iteration is, as Jason said above, a long poll. So whatever strategy you would have used inside any other forever loop (e.g., some break or return condition) is presumably the strategy you'd use here.

Copy link
Member

@fcollonval fcollonval left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @afshin

@vidartf
Copy link
Member

vidartf commented Nov 9, 2022

@afshin Right, so let's say we have widgets A and B. B listens to a stream on A via the new API. Then the owner of widget A figures out that it is no longer needed, and disposes of it (A.dispose() and lets the reference go out of scope). Will the infinite loop on B (with the reference to A) prevent the cleanup of A? Will it keep it around until B itself is cleaned up? Or will it be kept around forever?

@afshin
Copy link
Member Author

afshin commented Nov 9, 2022

I think you're exposing the need for a Stream#stop() perhaps.

@afshin
Copy link
Member Author

afshin commented Nov 10, 2022

@vidartf I added a .stop() method and updated the code snippets above to show how it can be used. I would say in the dispose() method of a class that has a stream, we should recommend that people stop the stream.

@afshin afshin merged commit ce0ce76 into jupyterlab:main Nov 11, 2022
@github-actions github-actions bot locked as resolved and limited conversation to collaborators Nov 12, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
enhancement New feature or request
Projects
No open projects
Status: done
Development

Successfully merging this pull request may close these issues.

4 participants