From 8f43e71f5692119e57a7acc5817c146d0b288e8c Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 15 Jun 2020 17:04:33 -0500 Subject: [PATCH] feat: remove async iteration (#5492) * feat: remove async iteration Removes experimental async iteration from Observable. Users that want this feature should really checkout https://github.com/benlesh/rxjs-for-await. There are too many edge cases for this, and also some uncertainty around cancellation behaviors we could provide. We may add this back in another iteration, but for now it is best in a separate library BREAKING CHANGE: Removes support for `for await`. Use https://github.com/benlesh/rxjs-for-await instead. * chore: remove unused file, fix lint * chore: update side-effect golden files --- .../side-effects/snapshots/esm/ajax.js | 2 +- .../side-effects/snapshots/esm/testing.js | 2 - .../side-effects/snapshots/esm/websocket.js | 2 +- spec/Observable-spec.ts | 74 +++---------------- src/internal/Observable.ts | 22 ------ src/internal/asyncIteratorFrom.ts | 62 ---------------- 6 files changed, 12 insertions(+), 152 deletions(-) delete mode 100644 src/internal/asyncIteratorFrom.ts diff --git a/integration/side-effects/snapshots/esm/ajax.js b/integration/side-effects/snapshots/esm/ajax.js index 5d63e0559f..8b13789179 100644 --- a/integration/side-effects/snapshots/esm/ajax.js +++ b/integration/side-effects/snapshots/esm/ajax.js @@ -1 +1 @@ -import "tslib"; + diff --git a/integration/side-effects/snapshots/esm/testing.js b/integration/side-effects/snapshots/esm/testing.js index 70c2dab83c..792fba30c0 100644 --- a/integration/side-effects/snapshots/esm/testing.js +++ b/integration/side-effects/snapshots/esm/testing.js @@ -1,5 +1,3 @@ -import "tslib"; - var NotificationKind; (function(NotificationKind) { diff --git a/integration/side-effects/snapshots/esm/websocket.js b/integration/side-effects/snapshots/esm/websocket.js index 5d63e0559f..8b13789179 100644 --- a/integration/side-effects/snapshots/esm/websocket.js +++ b/integration/side-effects/snapshots/esm/websocket.js @@ -1 +1 @@ -import "tslib"; + diff --git a/spec/Observable-spec.ts b/spec/Observable-spec.ts index 4e111024d5..0db58ba81f 100644 --- a/spec/Observable-spec.ts +++ b/spec/Observable-spec.ts @@ -2,8 +2,8 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; import { Observer, TeardownLogic } from '../src/internal/types'; import { cold, expectObservable, expectSubscriptions } from './helpers/marble-testing'; -import { Observable, config, Subscription, noop, Subscriber, Operator, NEVER, Subject, of, throwError, empty, interval } from 'rxjs'; -import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, take, finalize } from 'rxjs/operators'; +import { Observable, config, Subscription, noop, Subscriber, Operator, NEVER, Subject, of, throwError, empty } from 'rxjs'; +import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip } from 'rxjs/operators'; declare const rxTestScheduler: any; @@ -36,7 +36,7 @@ describe('Observable', () => { }); it('should send errors thrown in the constructor down the error path', (done) => { - new Observable((observer) => { + new Observable(() => { throw new Error('this should be handled'); }) .subscribe({ @@ -68,7 +68,7 @@ describe('Observable', () => { }); it('should reject promise when in error', (done) => { - throwError('bad').forEach((x) => { + throwError('bad').forEach(() => { done(new Error('should not be called')); }, Promise).then(() => { done(new Error('should not complete')); @@ -265,7 +265,7 @@ describe('Observable', () => { }); source.subscribe({ - error(err) { + error() { /* noop: expected error */ } }); @@ -655,7 +655,7 @@ describe('Observable.create', () => { }); it('should send errors thrown in the passed function down the error path', (done) => { - Observable.create((observer: Observer) => { + Observable.create(() => { throw new Error('this should be handled'); }) .subscribe({ @@ -719,7 +719,7 @@ describe('Observable.lift', () => { result.subscribe( function (x) { expect(x).to.equal(expected.shift()); - }, (x) => { + }, () => { done(new Error('should not be called')); }, () => { done(); @@ -745,7 +745,7 @@ describe('Observable.lift', () => { result.subscribe( function (x) { expect(x).to.equal(expected.shift()); - }, (x) => { + }, () => { done(new Error('should not be called')); }, () => { done(); @@ -769,7 +769,7 @@ describe('Observable.lift', () => { result.subscribe( function (x) { expect(x).to.equal(expected.shift()); - }, (x) => { + }, () => { done(new Error('should not be called')); }, () => { done(); @@ -909,7 +909,7 @@ describe('Observable.lift', () => { function (x) { expect(x).to.equal(expected.shift()); }, - (x) => { + () => { done(new Error('should not be called')); }, () => { expect(log).to.deep.equal([ @@ -939,57 +939,3 @@ describe('Observable.lift', () => { } }); }); - -if (Symbol && Symbol.asyncIterator) { - describe('async iterator support', () => { - it('should work for sync observables', async () => { - const source = of(1, 2, 3); - const results: number[] = []; - for await (const value of source) { - results.push(value); - } - expect(results).to.deep.equal([1, 2, 3]); - }); - - it('should throw if the observable errors', async () => { - const source = throwError(new Error('bad')); - let error: any; - try { - for await (const _ of source) { - // do nothing - } - } catch (err) { - error = err; - } - expect(error).to.be.an.instanceOf(Error); - expect(error.message).to.equal('bad'); - }); - - it('should support async observables', async () => { - const source = interval(10).pipe(take(3)); - const results: number[] = []; - for await (const value of source) { - results.push(value); - } - expect(results).to.deep.equal([0, 1, 2]); - }); - - it('should do something clever if the loop exits', async () => { - let finalized = false; - const source = interval(10).pipe(take(10), finalize(() => finalized = true)); - const results: number[] = []; - try { - for await (const value of source) { - results.push(value); - if (value === 1) { - throw new Error('bad'); - } - } - } catch (err) { - // ignore - } - expect(results).to.deep.equal([0, 1]); - expect(finalized).to.be.true; - }); - }); -} \ No newline at end of file diff --git a/src/internal/Observable.ts b/src/internal/Observable.ts index 5e0ec9bb8c..d1c314991e 100644 --- a/src/internal/Observable.ts +++ b/src/internal/Observable.ts @@ -9,7 +9,6 @@ import { throwError } from './observable/throwError'; import { observable as Symbol_observable } from './symbol/observable'; import { pipeFromArray } from './util/pipe'; import { config } from './config'; -import { asyncIteratorFrom } from './asyncIteratorFrom'; /** * A representation of any set of values over any amount of time. This is the most basic building block @@ -396,24 +395,3 @@ function getPromiseCtor(promiseCtor: PromiseConstructorLike | undefined) { return promiseCtor; } - -export interface Observable { - [Symbol.asyncIterator](): AsyncIterableIterator; -} - -(function () { - /** - * We only add this symbol if the runtime supports it. - * Adding this adds support for subscribing to observables - * via `for await(const value of source$) {}` - * - * This passes muster in Node 9, which does not support - * async iterators. As well as working in Node 12, which does - * support the symbol. - */ - if (Symbol && Symbol.asyncIterator) { - Observable.prototype[Symbol.asyncIterator] = function () { - return asyncIteratorFrom(this); - }; - } -})(); diff --git a/src/internal/asyncIteratorFrom.ts b/src/internal/asyncIteratorFrom.ts deleted file mode 100644 index 581fa6fe76..0000000000 --- a/src/internal/asyncIteratorFrom.ts +++ /dev/null @@ -1,62 +0,0 @@ -import { Observable } from './Observable'; -import { Deferred } from './util/deferred'; - -export function asyncIteratorFrom(source: Observable) { - return coroutine(source); -} - -async function* coroutine(source: Observable) { - const deferreds: Deferred>[] = []; - const values: T[] = []; - let hasError = false; - let error: any = null; - let completed = false; - - const subs = source.subscribe({ - next: value => { - if (deferreds.length > 0) { - deferreds.shift()!.resolve({ value, done: false }); - } else { - values.push(value); - } - }, - error: err => { - hasError = true; - error = err; - while (deferreds.length > 0) { - deferreds.shift()!.reject(err); - } - }, - complete: () => { - completed = true; - while (deferreds.length > 0) { - deferreds.shift()!.resolve({ value: undefined, done: true }); - } - }, - }); - - try { - while (true) { - if (values.length > 0) { - yield values.shift(); - } else if (completed) { - return; - } else if (hasError) { - throw error; - } else { - const d = new Deferred>(); - deferreds.push(d); - const result = await d.promise; - if (result.done) { - return; - } else { - yield result.value; - } - } - } - } catch (err) { - throw err; - } finally { - subs.unsubscribe(); - } -} \ No newline at end of file