diff --git a/goldens/public-api/core/rxjs-interop/index.api.md b/goldens/public-api/core/rxjs-interop/index.api.md index ac9d9021a34b69..b487f39a244c6c 100644 --- a/goldens/public-api/core/rxjs-interop/index.api.md +++ b/goldens/public-api/core/rxjs-interop/index.api.md @@ -10,6 +10,9 @@ import { MonoTypeOperatorFunction } from 'rxjs'; import { Observable } from 'rxjs'; import { OutputOptions } from '@angular/core'; import { OutputRef } from '@angular/core'; +import { ResourceLoaderParams } from '@angular/core'; +import { ResourceOptions } from '@angular/core'; +import { ResourceRef } from '@angular/core'; import { Signal } from '@angular/core'; import { Subscribable } from 'rxjs'; import { ValueEqualityFn } from '@angular/core/primitives/signals'; @@ -20,6 +23,15 @@ export function outputFromObservable(observable: Observable, opts?: Output // @public export function outputToObservable(ref: OutputRef): Observable; +// @public +export function rxResource(opts: RxResourceOptions): ResourceRef; + +// @public +export interface RxResourceOptions extends Omit, 'loader'> { + // (undocumented) + loader: (params: ResourceLoaderParams) => Observable; +} + // @public export function takeUntilDestroyed(destroyRef?: DestroyRef): MonoTypeOperatorFunction; diff --git a/packages/core/rxjs-interop/src/index.ts b/packages/core/rxjs-interop/src/index.ts index e2b0b1b57fb639..364ac8233eef79 100644 --- a/packages/core/rxjs-interop/src/index.ts +++ b/packages/core/rxjs-interop/src/index.ts @@ -15,3 +15,4 @@ export { toObservableMicrotask as ɵtoObservableMicrotask, } from './to_observable'; export {toSignal, ToSignalOptions} from './to_signal'; +export {RxResourceOptions, rxResource} from './rx_resource'; diff --git a/packages/core/rxjs-interop/src/rx_resource.ts b/packages/core/rxjs-interop/src/rx_resource.ts new file mode 100644 index 00000000000000..3e307d34ef9b7c --- /dev/null +++ b/packages/core/rxjs-interop/src/rx_resource.ts @@ -0,0 +1,44 @@ +/** + * @license + * Copyright Google LLC All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.dev/license + */ + +import { + assertInInjectionContext, + ResourceOptions, + resource, + ResourceLoaderParams, + ResourceRef, +} from '@angular/core'; +import {firstValueFrom, Observable, Subject} from 'rxjs'; +import {takeUntil} from 'rxjs/operators'; + +/** + * Like `ResourceOptions` but uses an RxJS-based `loader`. + * + * @experimental + */ +export interface RxResourceOptions extends Omit, 'loader'> { + loader: (params: ResourceLoaderParams) => Observable; +} + +/** + * Like `resource` but uses an RxJS based `loader` which maps the request to an `Observable` of the + * resource's value. Like `firstValueFrom`, only the first emission of the Observable is considered. + * + * @experimental + */ +export function rxResource(opts: RxResourceOptions): ResourceRef { + opts?.injector || assertInInjectionContext(rxResource); + return resource({ + ...opts, + loader: (params) => { + const cancelled = new Subject(); + params.abortSignal.addEventListener('abort', () => cancelled.next()); + return firstValueFrom(opts.loader(params).pipe(takeUntil(cancelled))); + }, + }); +} diff --git a/packages/core/rxjs-interop/test/rx_resource_spec.ts b/packages/core/rxjs-interop/test/rx_resource_spec.ts new file mode 100644 index 00000000000000..5673c67203b471 --- /dev/null +++ b/packages/core/rxjs-interop/test/rx_resource_spec.ts @@ -0,0 +1,61 @@ +/** + * @license + * Copyright Google LLC All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.dev/license + */ + +import {of, Observable} from 'rxjs'; +import {TestBed} from '@angular/core/testing'; +import {ApplicationRef, Injector, signal} from '@angular/core'; +import {rxResource} from '@angular/core/rxjs-interop'; + +describe('rxResource()', () => { + it('should fetch data using an observable loader', async () => { + const injector = TestBed.inject(Injector); + const appRef = TestBed.inject(ApplicationRef); + const res = rxResource({ + loader: () => of(1), + injector, + }); + await appRef.whenStable(); + expect(res.value()).toBe(1); + }); + + it('should cancel the fetch when a new request comes in', async () => { + const injector = TestBed.inject(Injector); + const appRef = TestBed.inject(ApplicationRef); + let unsub = false; + const request = signal(1); + const res = rxResource({ + request, + loader: ({request}) => + new Observable((sub) => { + if (request === 2) { + sub.next(true); + } + return () => { + if (request === 1) { + unsub = true; + } + }; + }), + injector, + }); + + // Wait for the resource to reach loading state. + await waitFor(() => res.isLoading()); + + // Setting request = 2 should cancel request = 1 + request.set(2); + await appRef.whenStable(); + expect(unsub).toBe(true); + }); +}); + +async function waitFor(fn: () => boolean): Promise { + while (!fn()) { + await new Promise((resolve) => setTimeout(resolve, 1)); + } +}