diff --git a/packages/signaling/package.json b/packages/signaling/package.json index f7d1262d4..44bb2f73c 100644 --- a/packages/signaling/package.json +++ b/packages/signaling/package.json @@ -45,6 +45,7 @@ }, "dependencies": { "@lumino/algorithm": "^2.0.0-alpha.6", + "@lumino/coreutils": "^2.0.0-alpha.6", "@lumino/properties": "^2.0.0-alpha.6" }, "devDependencies": { diff --git a/packages/signaling/src/index.ts b/packages/signaling/src/index.ts index 16d04fd5a..93ddd64f2 100644 --- a/packages/signaling/src/index.ts +++ b/packages/signaling/src/index.ts @@ -8,6 +8,7 @@ | The full license is in the file LICENSE, distributed with this software. |----------------------------------------------------------------------------*/ import { ArrayExt, find } from '@lumino/algorithm'; +import { PromiseDelegate } from '@lumino/coreutils'; import { AttachedProperty } from '@lumino/properties'; /** @@ -84,6 +85,11 @@ export interface ISignal { disconnect(slot: Slot, thisArg?: any): boolean; } +/** + * An object that is both a signal and an async iterable. + */ +export interface IStream extends ISignal, AsyncIterable {} + /** * A concrete implementation of `ISignal`. * @@ -157,11 +163,11 @@ export class Signal implements ISignal { * @param fn The callback during which the signal is blocked */ block(fn: () => void): void { - this._blockedCount++; + this.blocked++; try { fn(); } finally { - this._blockedCount--; + this.blocked--; } } @@ -204,12 +210,15 @@ export class Signal implements ISignal { * Exceptions thrown by connected slots will be caught and logged. */ emit(args: U): void { - if (!this._blockedCount) { + if (!this.blocked) { Private.emit(this, args); } } - private _blockedCount = 0; + /** + * If `blocked` is not `0`, the signal will not emit. + */ + protected blocked = 0; } /** @@ -338,10 +347,59 @@ export namespace Signal { } } +/** + * A stream with the characteristics of a signal and an async iterable. + */ +export class Stream extends Signal implements IStream { + /** + * Return an async iterator that yields every emission. + */ + async *[Symbol.asyncIterator](): AsyncIterableIterator { + let pending = this._pending; + while (true) { + try { + const { args, next } = await pending.promise; + pending = next; + yield args; + } catch (_) { + return; // Any promise rejection stops the iterator. + } + } + } + + /** + * Emit the signal, invoke the connected slots, and yield the emission. + * + * @param args - The args to pass to the connected slots. + */ + emit(args: U): void { + if (!this.blocked) { + const pending = this._pending; + this._pending = new PromiseDelegate(); + pending.resolve({ args, next: this._pending }); + super.emit(args); + } + } + + /** + * Stop the stream's async iteration. + */ + stop(): void { + this._pending.reject('stop'); + } + + private _pending: Private.Pending = new PromiseDelegate(); +} + /** * The namespace for the module implementation details. */ namespace Private { + /** + * A pending promise in a promise chain underlying a stream. + */ + export type Pending = PromiseDelegate<{ args: U; next: Pending }>; + /** * The signal exception handler function. */ diff --git a/packages/signaling/tests/src/index.spec.ts b/packages/signaling/tests/src/index.spec.ts index 7410a825a..602258323 100644 --- a/packages/signaling/tests/src/index.spec.ts +++ b/packages/signaling/tests/src/index.spec.ts @@ -9,14 +9,14 @@ |----------------------------------------------------------------------------*/ import { expect } from 'chai'; -import { Signal } from '@lumino/signaling'; +import { Signal, Stream } from '@lumino/signaling'; class TestObject { readonly one = new Signal(this); readonly two = new Signal(this); - readonly three = new Signal(this); + readonly three = new Stream(this); } class ExtendedObject extends TestObject { @@ -582,4 +582,132 @@ describe('@lumino/signaling', () => { }); }); }); + + describe('Stream', () => { + describe('#[Symbol.asyncIterator]()', () => { + it('should yield emissions and respect blocking', async () => { + const stream = new Stream({}); + const input = 'async'; + const expected = 'aINTERRUPTEDsync'; + const wait = Promise.resolve(); + let emitted = ''; + let once = true; + stream.connect(() => { + if (once) { + once = false; + stream.emit('I'); + stream.emit('N'); + stream.emit('T'); + stream.emit('E'); + stream.emit('R'); + stream.emit('R'); + stream.emit('U'); + stream.emit('P'); + stream.emit('T'); + stream.emit('E'); + stream.emit('D'); + } + }); + wait.then(() => stream.block(() => stream.emit('BLOCKED EMISSION 1'))); + input.split('').forEach(x => wait.then(() => stream.emit(x))); + wait.then(() => stream.block(() => stream.emit('BLOCKED EMISSION 2'))); + wait.then(() => stream.stop()); + for await (const letter of stream) { + emitted = emitted.concat(letter); + } + expect(emitted).to.equal(expected); + }); + + it('should return an async iterator', async () => { + const stream = new Stream({}); + const input = 'iterator'; + const expected = 'iAHEMterator'; + const wait = Promise.resolve(); + let emitted = ''; + let once = true; + stream.connect(() => { + if (once) { + once = false; + stream.emit('A'); + stream.emit('H'); + stream.emit('E'); + stream.emit('M'); + } + }); + wait.then(() => stream.block(() => stream.emit('BLOCKED EMISSION 1'))); + input.split('').forEach(x => wait.then(() => stream.emit(x))); + wait.then(() => stream.block(() => stream.emit('BLOCKED EMISSION 2'))); + wait.then(() => stream.stop()); + + const it = stream[Symbol.asyncIterator](); + let emission: IteratorResult; + while (!(emission = await it.next()).done) { + emitted = emitted.concat(emission.value); + } + + expect(emitted).to.equal(expected); + }); + }); + + describe('#stop()', () => { + it('should stop emissions in the async interable', async () => { + const stream = new Stream({}); + const input = 'continuing'; + const expected = 'cINTERRUPTEDontinuing'; + const wait = Promise.resolve(); + let emitted = ''; + let once = true; + stream.connect(() => { + if (once) { + once = false; + stream.emit('I'); + stream.emit('N'); + stream.emit('T'); + stream.emit('E'); + stream.emit('R'); + stream.emit('R'); + stream.emit('U'); + stream.emit('P'); + stream.emit('T'); + stream.emit('E'); + stream.emit('D'); + } + }); + input.split('').forEach(x => wait.then(() => stream.emit(x))); + wait.then(() => stream.stop()); + for await (const letter of stream) { + emitted = emitted.concat(letter); + } + expect(emitted).to.equal(expected); + }); + + it('should resolve to `done` in an async iterator', async () => { + const stream = new Stream({}); + const input = 'stopiterator'; + const expected = 'sAHEMtopiterator'; + const wait = Promise.resolve(); + let emitted = ''; + let once = true; + stream.connect(() => { + if (once) { + once = false; + stream.emit('A'); + stream.emit('H'); + stream.emit('E'); + stream.emit('M'); + } + }); + input.split('').forEach(x => wait.then(() => stream.emit(x))); + wait.then(() => stream.stop()); + + const it = stream[Symbol.asyncIterator](); + let emission: IteratorResult; + while (!(emission = await it.next()).done) { + emitted = emitted.concat(emission.value); + } + + expect(emitted).to.equal(expected); + }); + }); + }); }); diff --git a/review/api/signaling.api.md b/review/api/signaling.api.md index 80c130384..f837ff0d6 100644 --- a/review/api/signaling.api.md +++ b/review/api/signaling.api.md @@ -11,10 +11,15 @@ export interface ISignal { disconnect(slot: Slot, thisArg?: any): boolean; } +// @public +export interface IStream extends ISignal, AsyncIterable { +} + // @public export class Signal implements ISignal { constructor(sender: T); block(fn: () => void): void; + protected blocked: number; connect(slot: Slot, thisArg?: unknown): boolean; disconnect(slot: Slot, thisArg?: unknown): boolean; emit(args: U): void; @@ -37,6 +42,13 @@ export namespace Signal { // @public export type Slot = (sender: T, args: U) => void; +// @public +export class Stream extends Signal implements IStream { + [Symbol.asyncIterator](): AsyncIterableIterator; + emit(args: U): void; + stop(): void; +} + // (No @packageDocumentation comment for this package) ```