Skip to content

Commit

Permalink
Add Stream#stop()
Browse files Browse the repository at this point in the history
  • Loading branch information
afshin committed Nov 9, 2022
1 parent 41a4041 commit d1f9d04
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 14 deletions.
20 changes: 17 additions & 3 deletions packages/signaling/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,16 @@ export class Stream<T, U> extends Signal<T, U> implements IStream<T, U> {
async *[Symbol.asyncIterator](): AsyncIterableIterator<U> {
let pending = this.pending;
while (true) {
const resolved = await pending.promise;
pending = resolved.next;
yield resolved.args;
try {
const { args, next } = await pending.promise;
pending = next;
yield args;
} catch (reason) {
if (reason === 'stop') {
return;
}
throw 'unreachable';
}
}
}

Expand All @@ -377,6 +384,13 @@ export class Stream<T, U> extends Signal<T, U> implements IStream<T, U> {
}
}

/**
* Stop the stream's async iteration.
*/
stop(): void {
this.pending.reject('stop');
}

/**
* A promise that resolves the currently pending iteration.
*/
Expand Down
89 changes: 78 additions & 11 deletions packages/signaling/tests/src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -589,9 +589,10 @@ describe('@lumino/signaling', () => {
const stream = new Stream<unknown, string>({});
const input = 'async';
const expected = 'aINTERRUPTEDsync';
const wait = Promise.resolve();
let emitted = '';
let once = true;
stream.connect((_, emitted) => {
stream.connect(() => {
if (once) {
once = false;
stream.emit('I');
Expand All @@ -607,21 +608,87 @@ describe('@lumino/signaling', () => {
stream.emit('D');
}
});
setTimeout(() => stream.block(() => stream.emit('BLOCKED EMISSION 1')));
input.split('').forEach(x => setTimeout(() => stream.emit(x)));
setTimeout(() => stream.block(() => stream.emit('BLOCKED EMISSION 2')));
wait.then(() => stream.block(() => stream.emit('BLOCKED EMISSION 1')));
input.split('').forEach(x => wait.then(() => stream.emit(x)));
wait.then(() => stream.block(() => stream.emit('BLOCKED EMISSION 2')));
wait.then(() => stream.stop());
for await (const letter of stream) {
emitted = emitted.concat(letter);
if (emitted === expected) break;
}
expect(emitted).to.equal(expected);
});

it('should return an async iterator', async () => {
const stream = new Stream<unknown, string>({});
const input = 'iterator';
const expected = 'iAHEMterator';
const wait = Promise.resolve();
let emitted = '';
let once = true;
stream.connect(() => {
if (once) {
once = false;
stream.emit('A');
stream.emit('H');
stream.emit('E');
stream.emit('M');
}
});
wait.then(() => stream.block(() => stream.emit('BLOCKED EMISSION 1')));
input.split('').forEach(x => wait.then(() => stream.emit(x)));
wait.then(() => stream.block(() => stream.emit('BLOCKED EMISSION 2')));
wait.then(() => stream.stop());
let it = stream[Symbol.asyncIterator]();
// eslint-disable-next-line no-constant-condition
while (true) {
const emission = await it.next();
emitted = emitted.concat(emission.value || '');
if (emission.done) break;
}
expect(emitted).to.equal(expected);
});
});

describe('#stop()', () => {
it('should stop emissions in the async interable', async () => {
const stream = new Stream<unknown, string>({});
const input = 'continuing';
const expected = 'cINTERRUPTEDontinuing';
const wait = Promise.resolve();
let emitted = '';
let once = true;
stream.connect(() => {
if (once) {
once = false;
stream.emit('I');
stream.emit('N');
stream.emit('T');
stream.emit('E');
stream.emit('R');
stream.emit('R');
stream.emit('U');
stream.emit('P');
stream.emit('T');
stream.emit('E');
stream.emit('D');
}
});
input.split('').forEach(x => wait.then(() => stream.emit(x)));
wait.then(() => stream.stop());
for await (const letter of stream) {
emitted = emitted.concat(letter);
}
expect(emitted).to.equal(expected);
});

it('should resolve to `done` in an async iterator', async () => {
const stream = new Stream<unknown, string>({});
const input = 'stopiterator';
const expected = 'sAHEMtopiterator';
const wait = Promise.resolve();
let emitted = '';
let once = true;
stream.connect((_, emitted) => {
stream.connect(() => {
if (once) {
once = false;
stream.emit('A');
Expand All @@ -630,16 +697,16 @@ describe('@lumino/signaling', () => {
stream.emit('M');
}
});
setTimeout(() => stream.block(() => stream.emit('BLOCKED EMISSION 1')));
input.split('').forEach(x => setTimeout(() => stream.emit(x)));
setTimeout(() => stream.block(() => stream.emit('BLOCKED EMISSION 2')));
input.split('').forEach(x => wait.then(() => stream.emit(x)));
wait.then(() => stream.stop());
let it = stream[Symbol.asyncIterator]();
// eslint-disable-next-line no-constant-condition
while (true) {
const emission = await it.next();
emitted = emitted.concat(emission.value);
if (emitted === expected) break;
emitted = emitted.concat(emission.value || '');
if (emission.done) break;
}
expect(emitted).to.equal(expected);
});
});
});
Expand Down
1 change: 1 addition & 0 deletions review/api/signaling.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export class Stream<T, U> extends Signal<T, U> implements IStream<T, U> {
emit(args: U): void;
// Warning: (ae-forgotten-export) The symbol "Private" needs to be exported by the entry point index.d.ts
protected pending: Private.Pending<U>;
stop(): void;
}

// (No @packageDocumentation comment for this package)
Expand Down

0 comments on commit d1f9d04

Please sign in to comment.