Skip to content

Commit

Permalink
feat(fetch): Making fetch happen (ReactiveX#4702)
Browse files Browse the repository at this point in the history
* feat(fetch): Making fetch happen

- Adds `fromFetch` implementation that uses native `fetch`
- Adds tests and basic documentation
- DOES NOT polyfill fetch

* fixup! feat(fetch): Making fetch happen
  • Loading branch information
benlesh authored and BioPhoton committed May 15, 2019
1 parent 7e8e4cc commit b05d3f4
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs_app/tools/transforms/angular-api-package/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ module.exports = new Package('angular-api', [basePackage, typeScriptPackage])
'index.ts',
'operators/index.ts',
'ajax/index.ts',
'fetch/index.ts',
'webSocket/index.ts',
'testing/index.ts'
];
Expand Down
2 changes: 2 additions & 0 deletions integration/systemjs/systemjs-compatibility-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ System.config({
packages: {
'rxjs': {main: 'index.js', defaultExtension: 'js' },
'rxjs/ajax': {main: 'index.js', defaultExtension: 'js' },
'rxjs/fetch': {main: 'index.js', defaultExtension: 'js' },
'rxjs/operators': {main: 'index.js', defaultExtension: 'js' },
'rxjs/testing': {main: 'index.js', defaultExtension: 'js' },
'rxjs/webSocket': {main: 'index.js', defaultExtension: 'js' }
Expand All @@ -15,6 +16,7 @@ System.config({
Promise.all([
System.import('rxjs'),
System.import('rxjs/ajax'),
System.import('rxjs/fetch'),
System.import('rxjs/operators'),
System.import('rxjs/testing'),
System.import('rxjs/webSocket'),
Expand Down
196 changes: 196 additions & 0 deletions spec/observables/dom/fetch-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import { fromFetch } from 'rxjs/fetch';
import { expect } from 'chai';
import { root } from '../../../src/internal/util/root';

const OK_RESPONSE = {
ok: true,
} as Response;

function mockFetchImpl(input: string | Request, init?: RequestInit): Promise<Response> {
(mockFetchImpl as MockFetch).calls.push({ input, init });
return new Promise<any>((resolve, reject) => {
if (init.signal) {
init.signal.addEventListener('abort', () => {
reject(new MockDOMException());
});
}
return Promise.resolve(null).then(() => {
resolve((mockFetchImpl as any).respondWith);
});
});
}
(mockFetchImpl as MockFetch).reset = function (this: any) {
this.calls = [] as any[];
this.respondWith = OK_RESPONSE;
};
(mockFetchImpl as MockFetch).reset();

const mockFetch: MockFetch = mockFetchImpl as MockFetch;

class MockDOMException {}

class MockAbortController {
readonly signal = new MockAbortSignal();

abort() {
this.signal._signal();
}

constructor() {
MockAbortController.created++;
}

static created = 0;

static reset() {
MockAbortController.created = 0;
}
}

class MockAbortSignal {
private _listeners: Function[] = [];

aborted = false;

addEventListener(name: 'abort', handler: Function) {
this._listeners.push(handler);
}

removeEventListener(name: 'abort', handler: Function) {
const index = this._listeners.indexOf(handler);
if (index >= 0) {
this._listeners.splice(index, 1);
}
}

_signal() {
this.aborted = true;
while (this._listeners.length > 0) {
this._listeners.shift()();
}
}
}

interface MockFetch {
(input: string | Request, init?: RequestInit): Promise<Response>;
calls: { input: string | Request, init: RequestInit | undefined }[];
reset(): void;
respondWith: Response;
}

describe('fromFetch', () => {
let _fetch: typeof fetch;
let _AbortController: AbortController;

beforeEach(() => {
mockFetch.reset();
if (root.fetch) {
_fetch = root.fetch;
}
root.fetch = mockFetch;

MockAbortController.reset();
if (root.AbortController) {
_AbortController = root.AbortController;
}
root.AbortController = MockAbortController;
});

afterEach(() => {
root.fetch = _fetch;
root.AbortController = _AbortController;
});

it('should exist', () => {
expect(fromFetch).to.be.a('function');
});

it('should fetch', done => {
const fetch$ = fromFetch('/foo');
expect(mockFetch.calls.length).to.equal(0);
expect(MockAbortController.created).to.equal(0);

fetch$.subscribe({
next: response => {
expect(response).to.equal(OK_RESPONSE);
},
error: done,
complete: done,
});

expect(MockAbortController.created).to.equal(1);
expect(mockFetch.calls.length).to.equal(1);
expect(mockFetch.calls[0].input).to.equal('/foo');
expect(mockFetch.calls[0].init.signal).not.to.be.undefined;
expect(mockFetch.calls[0].init.signal.aborted).to.be.false;
});

it('should handle Response that is not `ok`', done => {
mockFetch.respondWith = {
ok: false,
status: 400,
body: 'Bad stuff here'
} as any as Response;

const fetch$ = fromFetch('/foo');
expect(mockFetch.calls.length).to.equal(0);
expect(MockAbortController.created).to.equal(0);

fetch$.subscribe({
next: response => {
expect(response).to.equal(mockFetch.respondWith);
},
complete: done,
error: done
});

expect(MockAbortController.created).to.equal(1);
expect(mockFetch.calls.length).to.equal(1);
expect(mockFetch.calls[0].input).to.equal('/foo');
expect(mockFetch.calls[0].init.signal).not.to.be.undefined;
expect(mockFetch.calls[0].init.signal.aborted).to.be.false;
});

it('should abort when unsubscribed', () => {
const fetch$ = fromFetch('/foo');
expect(mockFetch.calls.length).to.equal(0);
expect(MockAbortController.created).to.equal(0);
const subscription = fetch$.subscribe();

expect(MockAbortController.created).to.equal(1);
expect(mockFetch.calls.length).to.equal(1);
expect(mockFetch.calls[0].input).to.equal('/foo');
expect(mockFetch.calls[0].init.signal).not.to.be.undefined;
expect(mockFetch.calls[0].init.signal.aborted).to.be.false;

subscription.unsubscribe();
expect(mockFetch.calls[0].init.signal.aborted).to.be.true;
});

it('should allow passing of init object', done => {
const myInit = {};
const fetch$ = fromFetch('/foo', myInit);
fetch$.subscribe({
error: done,
complete: done,
});
expect(mockFetch.calls[0].init).to.equal(myInit);
expect(mockFetch.calls[0].init.signal).not.to.be.undefined;
});

it('should treat passed signals as a cancellation token which triggers an error', done => {
const controller = new MockAbortController();
const signal = controller.signal as any;
const fetch$ = fromFetch('/foo', { signal });
const subscription = fetch$.subscribe({
error: err => {
expect(err).to.be.instanceof(MockDOMException);
done();
}
});
controller.abort();
expect(mockFetch.calls[0].init.signal.aborted).to.be.true;
// The subscription will not be closed until the error fires when the promise resolves.
expect(subscription.closed).to.be.false;
});
});
2 changes: 1 addition & 1 deletion spec/support/default.opts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
--reporter dot

--check-leaks
--globals WebSocket,FormData,XDomainRequest,ActiveXObject
--globals WebSocket,FormData,XDomainRequest,ActiveXObject,fetch,AbortController

--recursive
--timeout 5000
1 change: 1 addition & 0 deletions src/fetch/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { fromFetch } from '../internal/observable/dom/fetch';
8 changes: 8 additions & 0 deletions src/fetch/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"name": "rxjs/fetch",
"typings": "./index.d.ts",
"main": "./index.js",
"module": "../_esm5/fetch/index.js",
"es2015": "../_esm2015/fetch/index.js",
"sideEffects": false
}
90 changes: 90 additions & 0 deletions src/internal/observable/dom/fetch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { Observable } from '../../Observable';

/**
* Uses [the Fetch API](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) to
* make an HTTP request.
*
* **WARNING** Parts of the fetch API are still experimental. `AbortController` is
* required for this implementation to work and use cancellation appropriately.
*
* Will automatically set up an internal [AbortController](https://developer.mozilla.org/en-US/docs/Web/API/AbortController)
* in order to teardown the internal `fetch` when the subscription tears down.
*
* If a `signal` is provided via the `init` argument, it will behave like it usually does with
* `fetch`. If the provided `signal` aborts, the error that `fetch` normally rejects with
* in that scenario will be emitted as an error from the observable.
*
* ### Basic Use
*
* ```ts
* import { of } from 'rxjs';
* import { fetch } from 'rxjs/fetch';
* import { switchMap, catchError } from 'rxjs/operators';
*
* const data$ = fetch('https://api.github.com/users?per_page=5').pipe(
* switchMap(response => {
* if(responose.ok) {
* // OK return data
* return response.json();
* } else {
* // Server is returning a status requiring the client to try something else.
* return of({ error: true, message: `Error ${response.status}` });
* }
* }),
* catchError(err => {
* // Network or other error, handle appropriately
* console.error(err);
* return of({ error: true, message: error.message })
* })
* );
*
* data$.subscribe({
* next: result => console.log(result),
* complete: () => console.log('done')
* })
* ```
*
* @param input The resource you would like to fetch. Can be a url or a request object.
* @param init A configuration object for the fetch.
* [See MDN for more details](https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope/fetch#Parameters)
* @returns An Observable, that when subscribed to performs an HTTP request using the native `fetch`
* function. The {@link Subscription} is tied to an `AbortController` for the the fetch.
*/
export function fromFetch(input: string | Request, init?: RequestInit): Observable<Response> {
return new Observable<Response>(subscriber => {
const controller = new AbortController();
const signal = controller.signal;
let outerSignalHandler: () => void;
let unsubscribed = false;

if (init) {
// If we a signal is provided, just have it teardown. It's a cancellation token, basically.
if (init.signal) {
outerSignalHandler = () => {
if (!signal.aborted) {
controller.abort();
}
};
init.signal.addEventListener('abort', outerSignalHandler);
}
init.signal = signal;
} else {
init = { signal };
}

fetch(input, init).then(response => {
subscriber.next(response);
subscriber.complete();
}).catch(err => {
if (!unsubscribed) {
// Only forward the error if it wasn't an abort.
subscriber.error(err);
}
});

return () => {
unsubscribed = true;
controller.abort();
};
});
}
1 change: 1 addition & 0 deletions tools/make-umd-compat-bundle.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ rollupBundle({
'rxjs/operators': 'dist-compat/esm5_for_rollup/src/operators/index.js',
'rxjs/webSocket': 'dist-compat/esm5_for_rollup/src/webSocket/index.js',
'rxjs/ajax': 'dist-compat/esm5_for_rollup/src/ajax/index.js',
'rxjs/fetch': 'dist-compat/esm5_for_rollup/src/fetch/index.js',
'rxjs/internal-compatibility': 'dist-compat/esm5_for_rollup/src/internal-compatibility/index.js',
'rxjs': 'dist-compat/esm5_for_rollup/src/index.js',
},
Expand Down
1 change: 1 addition & 0 deletions tsconfig/tsconfig.base.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// entry-points
"../src/index.ts",
"../src/ajax/index.ts",
"../src/fetch/index.ts",
"../src/operators/index.ts",
"../src/testing/index.ts",
"../src/webSocket/index.ts",
Expand Down
1 change: 1 addition & 0 deletions tsconfig/tsconfig.legacy-reexport.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"rxjs/internal-compatibility": ["../dist/typings/internal-compatibility/index"],
"rxjs/testing": ["../dist/typings/testing/index"],
"rxjs/ajax": ["../dist/typings/ajax/index"],
"rxjs/fetch": ["../dist/typings/fetch/index"],
"rxjs/operators": ["../dist/typings/operators/index"],
"rxjs/webSocket": ["../dist/typings/webSocket/index"],
"rxjs-compat": ["../dist-compat/typings/compat/Rx"],
Expand Down

0 comments on commit b05d3f4

Please sign in to comment.