diff --git a/docs_app/tools/transforms/angular-api-package/index.js b/docs_app/tools/transforms/angular-api-package/index.js index fafc7aed4d..75a7762657 100644 --- a/docs_app/tools/transforms/angular-api-package/index.js +++ b/docs_app/tools/transforms/angular-api-package/index.js @@ -70,6 +70,7 @@ module.exports = new Package('angular-api', [basePackage, typeScriptPackage]) 'index.ts', 'operators/index.ts', 'ajax/index.ts', + 'fetch/index.ts', 'webSocket/index.ts', 'testing/index.ts' ]; diff --git a/integration/systemjs/systemjs-compatibility-spec.js b/integration/systemjs/systemjs-compatibility-spec.js index 4c672575a1..cb1713c2c5 100644 --- a/integration/systemjs/systemjs-compatibility-spec.js +++ b/integration/systemjs/systemjs-compatibility-spec.js @@ -6,6 +6,7 @@ System.config({ packages: { 'rxjs': {main: 'index.js', defaultExtension: 'js' }, 'rxjs/ajax': {main: 'index.js', defaultExtension: 'js' }, + 'rxjs/fetch': {main: 'index.js', defaultExtension: 'js' }, 'rxjs/operators': {main: 'index.js', defaultExtension: 'js' }, 'rxjs/testing': {main: 'index.js', defaultExtension: 'js' }, 'rxjs/webSocket': {main: 'index.js', defaultExtension: 'js' } @@ -15,6 +16,7 @@ System.config({ Promise.all([ System.import('rxjs'), System.import('rxjs/ajax'), + System.import('rxjs/fetch'), System.import('rxjs/operators'), System.import('rxjs/testing'), System.import('rxjs/webSocket'), diff --git a/spec/observables/dom/fetch-spec.ts b/spec/observables/dom/fetch-spec.ts new file mode 100644 index 0000000000..93923c7914 --- /dev/null +++ b/spec/observables/dom/fetch-spec.ts @@ -0,0 +1,196 @@ +import { fromFetch } from 'rxjs/fetch'; +import { expect } from 'chai'; +import { root } from '../../../src/internal/util/root'; + +const OK_RESPONSE = { + ok: true, +} as Response; + +function mockFetchImpl(input: string | Request, init?: RequestInit): Promise { + (mockFetchImpl as MockFetch).calls.push({ input, init }); + return new Promise((resolve, reject) => { + if (init.signal) { + init.signal.addEventListener('abort', () => { + reject(new MockDOMException()); + }); + } + return Promise.resolve(null).then(() => { + resolve((mockFetchImpl as any).respondWith); + }); + }); +} +(mockFetchImpl as MockFetch).reset = function (this: any) { + this.calls = [] as any[]; + this.respondWith = OK_RESPONSE; +}; +(mockFetchImpl as MockFetch).reset(); + +const mockFetch: MockFetch = mockFetchImpl as MockFetch; + +class MockDOMException {} + +class MockAbortController { + readonly signal = new MockAbortSignal(); + + abort() { + this.signal._signal(); + } + + constructor() { + MockAbortController.created++; + } + + static created = 0; + + static reset() { + MockAbortController.created = 0; + } +} + +class MockAbortSignal { + private _listeners: Function[] = []; + + aborted = false; + + addEventListener(name: 'abort', handler: Function) { + this._listeners.push(handler); + } + + removeEventListener(name: 'abort', handler: Function) { + const index = this._listeners.indexOf(handler); + if (index >= 0) { + this._listeners.splice(index, 1); + } + } + + _signal() { + this.aborted = true; + while (this._listeners.length > 0) { + this._listeners.shift()(); + } + } +} + +interface MockFetch { + (input: string | Request, init?: RequestInit): Promise; + calls: { input: string | Request, init: RequestInit | undefined }[]; + reset(): void; + respondWith: Response; +} + +describe('fromFetch', () => { + let _fetch: typeof fetch; + let _AbortController: AbortController; + + beforeEach(() => { + mockFetch.reset(); + if (root.fetch) { + _fetch = root.fetch; + } + root.fetch = mockFetch; + + MockAbortController.reset(); + if (root.AbortController) { + _AbortController = root.AbortController; + } + root.AbortController = MockAbortController; + }); + + afterEach(() => { + root.fetch = _fetch; + root.AbortController = _AbortController; + }); + + it('should exist', () => { + expect(fromFetch).to.be.a('function'); + }); + + it('should fetch', done => { + const fetch$ = fromFetch('/foo'); + expect(mockFetch.calls.length).to.equal(0); + expect(MockAbortController.created).to.equal(0); + + fetch$.subscribe({ + next: response => { + expect(response).to.equal(OK_RESPONSE); + }, + error: done, + complete: done, + }); + + expect(MockAbortController.created).to.equal(1); + expect(mockFetch.calls.length).to.equal(1); + expect(mockFetch.calls[0].input).to.equal('/foo'); + expect(mockFetch.calls[0].init.signal).not.to.be.undefined; + expect(mockFetch.calls[0].init.signal.aborted).to.be.false; + }); + + it('should handle Response that is not `ok`', done => { + mockFetch.respondWith = { + ok: false, + status: 400, + body: 'Bad stuff here' + } as any as Response; + + const fetch$ = fromFetch('/foo'); + expect(mockFetch.calls.length).to.equal(0); + expect(MockAbortController.created).to.equal(0); + + fetch$.subscribe({ + next: response => { + expect(response).to.equal(mockFetch.respondWith); + }, + complete: done, + error: done + }); + + expect(MockAbortController.created).to.equal(1); + expect(mockFetch.calls.length).to.equal(1); + expect(mockFetch.calls[0].input).to.equal('/foo'); + expect(mockFetch.calls[0].init.signal).not.to.be.undefined; + expect(mockFetch.calls[0].init.signal.aborted).to.be.false; + }); + + it('should abort when unsubscribed', () => { + const fetch$ = fromFetch('/foo'); + expect(mockFetch.calls.length).to.equal(0); + expect(MockAbortController.created).to.equal(0); + const subscription = fetch$.subscribe(); + + expect(MockAbortController.created).to.equal(1); + expect(mockFetch.calls.length).to.equal(1); + expect(mockFetch.calls[0].input).to.equal('/foo'); + expect(mockFetch.calls[0].init.signal).not.to.be.undefined; + expect(mockFetch.calls[0].init.signal.aborted).to.be.false; + + subscription.unsubscribe(); + expect(mockFetch.calls[0].init.signal.aborted).to.be.true; + }); + + it('should allow passing of init object', done => { + const myInit = {}; + const fetch$ = fromFetch('/foo', myInit); + fetch$.subscribe({ + error: done, + complete: done, + }); + expect(mockFetch.calls[0].init).to.equal(myInit); + expect(mockFetch.calls[0].init.signal).not.to.be.undefined; + }); + + it('should treat passed signals as a cancellation token which triggers an error', done => { + const controller = new MockAbortController(); + const signal = controller.signal as any; + const fetch$ = fromFetch('/foo', { signal }); + const subscription = fetch$.subscribe({ + error: err => { + expect(err).to.be.instanceof(MockDOMException); + done(); + } + }); + controller.abort(); + expect(mockFetch.calls[0].init.signal.aborted).to.be.true; + // The subscription will not be closed until the error fires when the promise resolves. + expect(subscription.closed).to.be.false; + }); +}); diff --git a/spec/support/default.opts b/spec/support/default.opts index ead4feb92d..32264362e9 100644 --- a/spec/support/default.opts +++ b/spec/support/default.opts @@ -7,7 +7,7 @@ --reporter dot --check-leaks ---globals WebSocket,FormData,XDomainRequest,ActiveXObject +--globals WebSocket,FormData,XDomainRequest,ActiveXObject,fetch,AbortController --recursive --timeout 5000 diff --git a/src/fetch/index.ts b/src/fetch/index.ts new file mode 100644 index 0000000000..e6ff01da27 --- /dev/null +++ b/src/fetch/index.ts @@ -0,0 +1 @@ +export { fromFetch } from '../internal/observable/dom/fetch'; diff --git a/src/fetch/package.json b/src/fetch/package.json new file mode 100644 index 0000000000..dff5519633 --- /dev/null +++ b/src/fetch/package.json @@ -0,0 +1,8 @@ +{ + "name": "rxjs/fetch", + "typings": "./index.d.ts", + "main": "./index.js", + "module": "../_esm5/fetch/index.js", + "es2015": "../_esm2015/fetch/index.js", + "sideEffects": false +} diff --git a/src/internal/observable/dom/fetch.ts b/src/internal/observable/dom/fetch.ts new file mode 100644 index 0000000000..05551c2d21 --- /dev/null +++ b/src/internal/observable/dom/fetch.ts @@ -0,0 +1,90 @@ +import { Observable } from '../../Observable'; + +/** + * Uses [the Fetch API](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) to + * make an HTTP request. + * + * **WARNING** Parts of the fetch API are still experimental. `AbortController` is + * required for this implementation to work and use cancellation appropriately. + * + * Will automatically set up an internal [AbortController](https://developer.mozilla.org/en-US/docs/Web/API/AbortController) + * in order to teardown the internal `fetch` when the subscription tears down. + * + * If a `signal` is provided via the `init` argument, it will behave like it usually does with + * `fetch`. If the provided `signal` aborts, the error that `fetch` normally rejects with + * in that scenario will be emitted as an error from the observable. + * + * ### Basic Use + * + * ```ts + * import { of } from 'rxjs'; + * import { fetch } from 'rxjs/fetch'; + * import { switchMap, catchError } from 'rxjs/operators'; + * + * const data$ = fetch('https://api.github.com/users?per_page=5').pipe( + * switchMap(response => { + * if(responose.ok) { + * // OK return data + * return response.json(); + * } else { + * // Server is returning a status requiring the client to try something else. + * return of({ error: true, message: `Error ${response.status}` }); + * } + * }), + * catchError(err => { + * // Network or other error, handle appropriately + * console.error(err); + * return of({ error: true, message: error.message }) + * }) + * ); + * + * data$.subscribe({ + * next: result => console.log(result), + * complete: () => console.log('done') + * }) + * ``` + * + * @param input The resource you would like to fetch. Can be a url or a request object. + * @param init A configuration object for the fetch. + * [See MDN for more details](https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope/fetch#Parameters) + * @returns An Observable, that when subscribed to performs an HTTP request using the native `fetch` + * function. The {@link Subscription} is tied to an `AbortController` for the the fetch. + */ +export function fromFetch(input: string | Request, init?: RequestInit): Observable { + return new Observable(subscriber => { + const controller = new AbortController(); + const signal = controller.signal; + let outerSignalHandler: () => void; + let unsubscribed = false; + + if (init) { + // If we a signal is provided, just have it teardown. It's a cancellation token, basically. + if (init.signal) { + outerSignalHandler = () => { + if (!signal.aborted) { + controller.abort(); + } + }; + init.signal.addEventListener('abort', outerSignalHandler); + } + init.signal = signal; + } else { + init = { signal }; + } + + fetch(input, init).then(response => { + subscriber.next(response); + subscriber.complete(); + }).catch(err => { + if (!unsubscribed) { + // Only forward the error if it wasn't an abort. + subscriber.error(err); + } + }); + + return () => { + unsubscribed = true; + controller.abort(); + }; + }); +} diff --git a/tools/make-umd-compat-bundle.js b/tools/make-umd-compat-bundle.js index 7ba4826303..83e2eb4f3d 100644 --- a/tools/make-umd-compat-bundle.js +++ b/tools/make-umd-compat-bundle.js @@ -8,6 +8,7 @@ rollupBundle({ 'rxjs/operators': 'dist-compat/esm5_for_rollup/src/operators/index.js', 'rxjs/webSocket': 'dist-compat/esm5_for_rollup/src/webSocket/index.js', 'rxjs/ajax': 'dist-compat/esm5_for_rollup/src/ajax/index.js', + 'rxjs/fetch': 'dist-compat/esm5_for_rollup/src/fetch/index.js', 'rxjs/internal-compatibility': 'dist-compat/esm5_for_rollup/src/internal-compatibility/index.js', 'rxjs': 'dist-compat/esm5_for_rollup/src/index.js', }, diff --git a/tsconfig/tsconfig.base.json b/tsconfig/tsconfig.base.json index 7bf2b8ea11..9f53b1292d 100644 --- a/tsconfig/tsconfig.base.json +++ b/tsconfig/tsconfig.base.json @@ -13,6 +13,7 @@ // entry-points "../src/index.ts", "../src/ajax/index.ts", + "../src/fetch/index.ts", "../src/operators/index.ts", "../src/testing/index.ts", "../src/webSocket/index.ts", diff --git a/tsconfig/tsconfig.legacy-reexport.json b/tsconfig/tsconfig.legacy-reexport.json index 3a134b2576..0768fa43de 100644 --- a/tsconfig/tsconfig.legacy-reexport.json +++ b/tsconfig/tsconfig.legacy-reexport.json @@ -16,6 +16,7 @@ "rxjs/internal-compatibility": ["../dist/typings/internal-compatibility/index"], "rxjs/testing": ["../dist/typings/testing/index"], "rxjs/ajax": ["../dist/typings/ajax/index"], + "rxjs/fetch": ["../dist/typings/fetch/index"], "rxjs/operators": ["../dist/typings/operators/index"], "rxjs/webSocket": ["../dist/typings/webSocket/index"], "rxjs-compat": ["../dist-compat/typings/compat/Rx"],