Skip to content

Commit

Permalink
Keep listeners attached between next calls.
Browse files Browse the repository at this point in the history
  • Loading branch information
RubenVerborgh committed Oct 27, 2023
1 parent f60cecc commit 0948d9c
Showing 1 changed file with 51 additions and 25 deletions.
76 changes: 51 additions & 25 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -575,35 +575,61 @@ export class AsyncIterator<T> extends EventEmitter implements AsyncIterable<T> {
*/
[Symbol.asyncIterator](): ESAsyncIterator<T> {
const it = this;
let currentResolve: null | Function = null;
let currentReject: null | Function = null;
let pendingError: null | Error = null;

it.addListener('readable', tryResolve);
it.addListener('end', tryResolve);
it.addListener('error', tryReject);

// Tries to emit an item or signal the end of the iterator
function tryResolve(): void {
if (currentResolve !== null) {
if (pendingError !== null) {
tryReject(pendingError);
}
else if (it.done) {
currentResolve({ done: true, value: undefined });
currentResolve = currentReject = null;
removeListeners();
}
else {
const value = it.read();
if (value !== null) {
currentResolve({ done: false, value });
currentResolve = currentReject = null;
}
}
}
}

// Tries to emit en error
function tryReject(error: Error) {
if (currentReject !== null) {
currentReject(error);
currentResolve = currentReject = pendingError = null;
removeListeners();
}
else if (pendingError !== null) {
pendingError = error;
}
}

// Cleans up all attached listeners
function removeListeners() {
it.removeListener('readable', tryResolve);
it.removeListener('end', tryResolve);
it.removeListener('error', tryReject);
}

// An EcmaScript AsyncIterator exposes the next() function that can be invoked repeatedly
return {
next(): Promise<IteratorResult<T>> {
return new Promise<IteratorResult<T>>((resolve, reject) => {
let hasDataListeners = false;
it.on('error', reject);

// Tries to read the next item and passes it through the callback
function readNext(): void {
// Try to read the next item
const value = it.read();
if (value !== null || it.done) {
it.removeListener('error', reject);
if (hasDataListeners) {
it.removeListener('end', readNext);
it.removeListener('readable', readNext);
}
resolve(value !== null ?
{ done: false, value } :
{ done: true, value: undefined });
}
// If none available, listen for new items
else if (!hasDataListeners) {
hasDataListeners = true;
it.on('end', readNext);
it.on('readable', readNext);
}
}
readNext();
currentResolve = resolve;
currentReject = reject;
tryResolve();
});
},
};
Expand Down

0 comments on commit 0948d9c

Please sign in to comment.