Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make iterators AsyncIterable, Closes #89 #102

Merged
merged 6 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
node: true,
},

globals: {
AsyncIterable: false,
},

parser: "@typescript-eslint/parser",

parserOptions: {
Expand Down
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,25 @@ numbers.setProperty('later', 'value');
// 'value'
```

### Consuming an AsyncIterator as EcmaScript-AsyncIterator
Due to the syntactical sugar [EcmaScript's AsyncIterator](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/AsyncIterator)
provides, our iterators can also be consumed as such.
If high performance over large iterators is required, this method of consumption not recommended.

```JavaScript
const numbers = new IntegerIterator({ start: 1, end: 100 });

for await (const number of numbers)
console.log('number', number);
console.log('all done!');
```

Error events emitted within the iterator can be caught by wrapping the for-await-block in a try-catch.

In cases where the returned EcmaScript AsyncIterator will not be fully consumed,
it is recommended to manually listen for error events on the main AsyncIterator
to avoid uncaught error messages.

## License
The asynciterator library is copyrighted by [Ruben Verborgh](http://ruben.verborgh.org/)
and released under the [MIT License](http://opensource.org/licenses/MIT).
82 changes: 80 additions & 2 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,11 @@ export const ENDED = 1 << 4;
*/
export const DESTROYED = 1 << 5;


/**
An asynchronous iterator provides pull-based access to a stream of objects.
@extends module:asynciterator.EventEmitter
*/
export class AsyncIterator<T> extends EventEmitter {
export class AsyncIterator<T> extends EventEmitter implements AsyncIterable<T> {
protected _state: number;
private _readable = false;
protected _properties?: { [name: string]: any };
Expand Down Expand Up @@ -568,6 +567,78 @@ export class AsyncIterator<T> extends EventEmitter {
clone(): ClonedIterator<T> {
return new ClonedIterator<T>(this);
}

/**
* An AsyncIterator is async iterable.
* This allows iterators to be used via the for-await syntax.
*
* In cases where the returned EcmaScript AsyncIterator will not be fully consumed,
* it is recommended to manually listen for error events on the main AsyncIterator
* to avoid uncaught error messages.
*
* @returns {ESAsyncIterator<T>} An EcmaScript AsyncIterator
*/
[Symbol.asyncIterator](): ESAsyncIterator<T> {
const it = this;
let currentResolve: null | Function = null;
let currentReject: null | Function = null;
let pendingError: null | Error = null;

it.addListener('readable', tryResolve);
it.addListener('end', tryResolve);
it.addListener('error', tryReject);

// Tries to emit an item or signal the end of the iterator
function tryResolve(): void {
if (currentResolve !== null) {
if (pendingError !== null) {
tryReject(pendingError);
}
else if (it.done) {
currentResolve({ done: true, value: undefined });
currentResolve = currentReject = null;
removeListeners();
}
else {
const value = it.read();
if (value !== null) {
currentResolve({ done: false, value });
currentResolve = currentReject = null;
}
}
}
}

// Tries to emit an error
function tryReject(error: Error) {
if (currentReject !== null) {
currentReject(error);
currentResolve = currentReject = pendingError = null;
removeListeners();
}
else if (pendingError === null) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 obvs I put that in on purpose to check your tests

pendingError = error;
}
}

// Cleans up all attached listeners
function removeListeners() {
it.removeListener('readable', tryResolve);
it.removeListener('end', tryResolve);
it.removeListener('error', tryReject);
}

// An EcmaScript AsyncIterator exposes the next() function that can be invoked repeatedly
return {
next(): Promise<IteratorResult<T>> {
return new Promise<IteratorResult<T>>((resolve, reject) => {
currentResolve = resolve;
currentReject = reject;
tryResolve();
});
},
};
}
}

// Starts emitting `data` events when `data` listeners are added
Expand Down Expand Up @@ -2245,6 +2316,13 @@ export interface MultiTransformOptions<S, D> extends TransformOptions<S, D> {
multiTransform?: (item: S) => AsyncIterator<D>;
}

/**
* Copy of the EcmaScript AsyncIterator interface, which we can not use directly due to the name conflict.
*/
interface ESAsyncIterator<T> {
next(value?: any): Promise<IteratorResult<T>>;
}

type MaybePromise<T> =
T |
Promise<T>;
Expand Down
141 changes: 141 additions & 0 deletions test/AsyncIterator-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1309,6 +1309,147 @@ describe('AsyncIterator', () => {
});
});
});

describe('The AsyncIterator#[Symbol.asyncIterator] function', () => {
it('should be a function', () => {
expect(AsyncIterator.prototype[Symbol.asyncIterator]).to.be.a('function');
});

describe('called on an empty iterator', () => {
let iterator;
before(() => {
iterator = new AsyncIterator();
iterator.close();
});

it('should go through zero iterations', async () => {
let i = 0;
for await (const value of iterator) {
value.should.not.equal(undefined);
i++;
}
i.should.equal(0);
});
});

describe('called on an iterator with two items', () => {
let iterator;
before(() => {
let i = 0;
iterator = new AsyncIterator();
iterator.readable = true;
iterator.read = () => {
if (i++ < 2)
return i;
iterator.close();
return null;
};
});

it('should go through two iterations', async () => {
const values = [];
for await (const value of iterator)
values.push(value);
values.should.eql([1, 2]);
});
});

describe('called on an iterator with two slowly generated items', () => {
let iterator;
before(() => {
let i = 0;
let generate = false;
iterator = new AsyncIterator();
iterator.readable = false;
iterator.read = () => {
if (!generate) {
generate = true;
setImmediate(() => {
iterator.readable = true;
});
return null;
}
generate = false;
iterator.readable = false;

if (i++ < 2)
return i;
iterator.close();
return null;
};
});

it('should go through two iterations', async () => {
const values = [];
for await (const value of iterator)
values.push(value);
values.should.eql([1, 2]);
});
});

describe('called on an erroring iterator', () => {
let iterator;
before(() => {
let i = 0;
iterator = new AsyncIterator();
iterator.readable = true;
iterator.read = () => {
if (i++ < 2)
return i;
iterator.emit('error', new Error('AsyncIterator error'));
return null;
};
});

it('should go through two iterations and then throw', async () => {
const values = [];
let caughtError;
try {
for await (const value of iterator)
values.push(value);
}
catch (error) {
caughtError = error;
}
values.should.eql([1, 2]);
caughtError.message.should.eql('AsyncIterator error');
});
});

describe('called on an iterator that errors inbetween next() calls', () => {
let iterator;
before(() => {
let i = 0;
iterator = new AsyncIterator();
iterator.readable = true;
iterator.read = () => {
if (i++ < 2)
return i;
return null;
};
});

it('should throw errors that were emitted before next() was called', async () => {
const values = [];
let caughtError;
const esit = iterator[Symbol.asyncIterator]();

values.push(await esit.next());

iterator.emit('error', new Error('AsyncIterator error'));

try {
await esit.next();
}
catch (error) {
caughtError = error;
}

values.should.eql([{ done: false, value: 1 }]);
caughtError.message.should.eql('AsyncIterator error');
});
});
});
});

describe('Type-checking functions', () => {
Expand Down
17 changes: 17 additions & 0 deletions test/integration-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@ describe('Integration tests', () => {
});
});

describe('A sequence of ArrayIterator, TransformIterator, and Unioniterator is AsyncIterable', () => {
let arrayIterator, transformIterator, unionIterator;

before(() => {
arrayIterator = new ArrayIterator([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], { autoStart: false });
transformIterator = new TransformIterator(arrayIterator, { autoStart: false });
unionIterator = new UnionIterator([transformIterator], { autoStart: false });
});

it('returns all values', async () => {
const values = [];
for await (const value of unionIterator)
values.push(value);
values.should.eql([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});
});

describe('Cloning iterators', () => {
describe('A clone of an empty ArrayIterator without autoStart', () => {
let arrayIterator, clonedIterator;
Expand Down
Loading