diff --git a/asynciterator.ts b/asynciterator.ts index 7b0e886..eeb2c8f 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -575,35 +575,61 @@ export class AsyncIterator extends EventEmitter implements AsyncIterable { */ [Symbol.asyncIterator](): ESAsyncIterator { const it = this; + let currentResolve: null | Function = null; + let currentReject: null | Function = null; + let pendingError: null | Error = null; + + it.addListener('readable', tryResolve); + it.addListener('end', tryResolve); + it.addListener('error', tryReject); + + // Tries to emit an item or signal the end of the iterator + function tryResolve(): void { + if (currentResolve !== null) { + if (pendingError !== null) { + tryReject(pendingError); + } + else if (it.done) { + currentResolve({ done: true, value: undefined }); + currentResolve = currentReject = null; + removeListeners(); + } + else { + const value = it.read(); + if (value !== null) { + currentResolve({ done: false, value }); + currentResolve = currentReject = null; + } + } + } + } + + // Tries to emit en error + function tryReject(error: Error) { + if (currentReject !== null) { + currentReject(error); + currentResolve = currentReject = pendingError = null; + removeListeners(); + } + else if (pendingError !== null) { + pendingError = error; + } + } + + // Cleans up all attached listeners + function removeListeners() { + it.removeListener('readable', tryResolve); + it.removeListener('end', tryResolve); + it.removeListener('error', tryReject); + } + // An EcmaScript AsyncIterator exposes the next() function that can be invoked repeatedly return { next(): Promise> { return new Promise>((resolve, reject) => { - let hasDataListeners = false; - it.on('error', reject); - - // Tries to read the next item and passes it through the callback - function readNext(): void { - // Try to read the next item - const value = it.read(); - if (value !== null || it.done) { - it.removeListener('error', reject); - if (hasDataListeners) { - it.removeListener('end', readNext); - it.removeListener('readable', readNext); - } - resolve(value !== null ? - { done: false, value } : - { done: true, value: undefined }); - } - // If none available, listen for new items - else if (!hasDataListeners) { - hasDataListeners = true; - it.on('end', readNext); - it.on('readable', readNext); - } - } - readNext(); + currentResolve = resolve; + currentReject = reject; + tryResolve(); }); }, };