Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async iterable Stream class that inherits from Signal #462

Merged
merged 11 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/signaling/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
},
"dependencies": {
"@lumino/algorithm": "^2.0.0-alpha.6",
"@lumino/coreutils": "^2.0.0-alpha.6",
"@lumino/properties": "^2.0.0-alpha.6"
},
"devDependencies": {
Expand Down
66 changes: 62 additions & 4 deletions packages/signaling/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
| The full license is in the file LICENSE, distributed with this software.
|----------------------------------------------------------------------------*/
import { ArrayExt, find } from '@lumino/algorithm';
import { PromiseDelegate } from '@lumino/coreutils';
import { AttachedProperty } from '@lumino/properties';

/**
Expand Down Expand Up @@ -84,6 +85,11 @@ export interface ISignal<T, U> {
disconnect(slot: Slot<T, U>, thisArg?: any): boolean;
}

/**
* An object that is both a signal and an async iterable.
*/
export interface IStream<T, U> extends ISignal<T, U>, AsyncIterable<U> {}

/**
* A concrete implementation of `ISignal`.
*
Expand Down Expand Up @@ -157,11 +163,11 @@ export class Signal<T, U> implements ISignal<T, U> {
* @param fn The callback during which the signal is blocked
*/
block(fn: () => void): void {
this._blockedCount++;
this.blocked++;
try {
fn();
} finally {
this._blockedCount--;
this.blocked--;
}
}

Expand Down Expand Up @@ -204,12 +210,15 @@ export class Signal<T, U> implements ISignal<T, U> {
* Exceptions thrown by connected slots will be caught and logged.
*/
emit(args: U): void {
if (!this._blockedCount) {
if (!this.blocked) {
Private.emit(this, args);
}
}

private _blockedCount = 0;
/**
* If `blocked` is not `0`, the signal will not emit.
*/
protected blocked = 0;
}

/**
Expand Down Expand Up @@ -338,10 +347,59 @@ export namespace Signal {
}
}

/**
* A stream with the characteristics of a signal and an async iterable.
*/
export class Stream<T, U> extends Signal<T, U> implements IStream<T, U> {
/**
* Return an async iterator that yields every emission.
*/
async *[Symbol.asyncIterator](): AsyncIterableIterator<U> {
let pending = this._pending;
while (true) {
try {
const { args, next } = await pending.promise;
pending = next;
yield args;
} catch (_) {
return; // Any promise rejection stops the iterator.
}
}
}

/**
* Emit the signal, invoke the connected slots, and yield the emission.
*
* @param args - The args to pass to the connected slots.
*/
emit(args: U): void {
if (!this.blocked) {
const pending = this._pending;
this._pending = new PromiseDelegate();
pending.resolve({ args, next: this._pending });
super.emit(args);
}
}

/**
* Stop the stream's async iteration.
*/
stop(): void {
this._pending.reject('stop');
}

private _pending: Private.Pending<U> = new PromiseDelegate();
}

/**
* The namespace for the module implementation details.
*/
namespace Private {
/**
* A pending promise in a promise chain underlying a stream.
*/
export type Pending<U> = PromiseDelegate<{ args: U; next: Pending<U> }>;

/**
* The signal exception handler function.
*/
Expand Down
132 changes: 130 additions & 2 deletions packages/signaling/tests/src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
|----------------------------------------------------------------------------*/
import { expect } from 'chai';

import { Signal } from '@lumino/signaling';
import { Signal, Stream } from '@lumino/signaling';

class TestObject {
readonly one = new Signal<this, void>(this);

readonly two = new Signal<this, number>(this);

readonly three = new Signal<this, string[]>(this);
readonly three = new Stream<this, string[]>(this);
}

class ExtendedObject extends TestObject {
Expand Down Expand Up @@ -582,4 +582,132 @@ describe('@lumino/signaling', () => {
});
});
});

describe('Stream', () => {
describe('#[Symbol.asyncIterator]()', () => {
it('should yield emissions and respect blocking', async () => {
const stream = new Stream<unknown, string>({});
const input = 'async';
const expected = 'aINTERRUPTEDsync';
const wait = Promise.resolve();
let emitted = '';
let once = true;
stream.connect(() => {
if (once) {
once = false;
stream.emit('I');
stream.emit('N');
stream.emit('T');
stream.emit('E');
stream.emit('R');
stream.emit('R');
stream.emit('U');
stream.emit('P');
stream.emit('T');
stream.emit('E');
stream.emit('D');
}
});
wait.then(() => stream.block(() => stream.emit('BLOCKED EMISSION 1')));
input.split('').forEach(x => wait.then(() => stream.emit(x)));
wait.then(() => stream.block(() => stream.emit('BLOCKED EMISSION 2')));
wait.then(() => stream.stop());
for await (const letter of stream) {
emitted = emitted.concat(letter);
}
expect(emitted).to.equal(expected);
});

it('should return an async iterator', async () => {
const stream = new Stream<unknown, string>({});
const input = 'iterator';
const expected = 'iAHEMterator';
const wait = Promise.resolve();
let emitted = '';
let once = true;
stream.connect(() => {
if (once) {
once = false;
stream.emit('A');
stream.emit('H');
stream.emit('E');
stream.emit('M');
}
});
wait.then(() => stream.block(() => stream.emit('BLOCKED EMISSION 1')));
input.split('').forEach(x => wait.then(() => stream.emit(x)));
wait.then(() => stream.block(() => stream.emit('BLOCKED EMISSION 2')));
wait.then(() => stream.stop());

const it = stream[Symbol.asyncIterator]();
let emission: IteratorResult<string, any>;
while (!(emission = await it.next()).done) {
emitted = emitted.concat(emission.value);
}

expect(emitted).to.equal(expected);
});
});

describe('#stop()', () => {
it('should stop emissions in the async interable', async () => {
const stream = new Stream<unknown, string>({});
const input = 'continuing';
const expected = 'cINTERRUPTEDontinuing';
const wait = Promise.resolve();
let emitted = '';
let once = true;
stream.connect(() => {
if (once) {
once = false;
stream.emit('I');
stream.emit('N');
stream.emit('T');
stream.emit('E');
stream.emit('R');
stream.emit('R');
stream.emit('U');
stream.emit('P');
stream.emit('T');
stream.emit('E');
stream.emit('D');
}
});
input.split('').forEach(x => wait.then(() => stream.emit(x)));
wait.then(() => stream.stop());
for await (const letter of stream) {
emitted = emitted.concat(letter);
}
expect(emitted).to.equal(expected);
});

it('should resolve to `done` in an async iterator', async () => {
const stream = new Stream<unknown, string>({});
const input = 'stopiterator';
const expected = 'sAHEMtopiterator';
const wait = Promise.resolve();
let emitted = '';
let once = true;
stream.connect(() => {
if (once) {
once = false;
stream.emit('A');
stream.emit('H');
stream.emit('E');
stream.emit('M');
}
});
input.split('').forEach(x => wait.then(() => stream.emit(x)));
wait.then(() => stream.stop());

const it = stream[Symbol.asyncIterator]();
let emission: IteratorResult<string, any>;
while (!(emission = await it.next()).done) {
emitted = emitted.concat(emission.value);
}

expect(emitted).to.equal(expected);
});
});
});
});
12 changes: 12 additions & 0 deletions review/api/signaling.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@ export interface ISignal<T, U> {
disconnect(slot: Slot<T, U>, thisArg?: any): boolean;
}

// @public
export interface IStream<T, U> extends ISignal<T, U>, AsyncIterable<U> {
}

// @public
export class Signal<T, U> implements ISignal<T, U> {
constructor(sender: T);
block(fn: () => void): void;
protected blocked: number;
connect(slot: Slot<T, U>, thisArg?: unknown): boolean;
disconnect(slot: Slot<T, U>, thisArg?: unknown): boolean;
emit(args: U): void;
Expand All @@ -37,6 +42,13 @@ export namespace Signal {
// @public
export type Slot<T, U> = (sender: T, args: U) => void;

// @public
export class Stream<T, U> extends Signal<T, U> implements IStream<T, U> {
[Symbol.asyncIterator](): AsyncIterableIterator<U>;
emit(args: U): void;
stop(): void;
}

// (No @packageDocumentation comment for this package)

```