Skip to content

Commit

Permalink
feat(core): support multiple responses in resource()
Browse files Browse the repository at this point in the history
This commit adds support in `resource()` for loaders to update the value of
the resource over time. This comes in the form of `set`/`error` functions
which get passed to the loader, which can be used to update the resource's
value as needed (for example, if new values are delivered from the server).

At the same time, the loader is no longer required to return a `Promise` as
it can exclusively update the resource through `set`.

`rxResource()` is updated to leverage this new functionality to handle
multiple responses from the underlying Observable.
  • Loading branch information
alxhub committed Dec 3, 2024
1 parent c9d2703 commit fe688d5
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 43 deletions.
8 changes: 6 additions & 2 deletions goldens/public-api/core/index.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1586,18 +1586,22 @@ export interface Resource<T> {
export function resource<T, R>(options: ResourceOptions<T, R>): ResourceRef<T | undefined>;

// @public
export type ResourceLoader<T, R> = (param: ResourceLoaderParams<R>) => PromiseLike<T>;
export type ResourceLoader<T, R> = (param: ResourceLoaderParams<T, R>) => PromiseLike<T> | void;

// @public
export interface ResourceLoaderParams<R> {
export interface ResourceLoaderParams<T, R> {
// (undocumented)
abortSignal: AbortSignal;
// (undocumented)
error: (error: unknown) => void;
// (undocumented)
previous: {
status: ResourceStatus;
};
// (undocumented)
request: Exclude<NoInfer<R>, undefined>;
// (undocumented)
set: (value: T) => void;
}

// @public
Expand Down
2 changes: 1 addition & 1 deletion goldens/public-api/core/rxjs-interop/index.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export function rxResource<T, R>(opts: RxResourceOptions<T, R>): ResourceRef<T |
// @public
export interface RxResourceOptions<T, R> extends Omit<ResourceOptions<T, R>, 'loader'> {
// (undocumented)
loader: (params: ResourceLoaderParams<R>) => Observable<T>;
loader: (params: Omit<ResourceLoaderParams<T, R>, 'set' | 'error'>) => Observable<T>;
}

// @public
Expand Down
38 changes: 23 additions & 15 deletions packages/core/rxjs-interop/src/rx_resource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
ResourceLoaderParams,
ResourceRef,
} from '@angular/core';
import {Observable, Subject} from 'rxjs';
import {Observable, Subject, Subscription} from 'rxjs';
import {take, takeUntil} from 'rxjs/operators';

/**
Expand All @@ -22,7 +22,7 @@ import {take, takeUntil} from 'rxjs/operators';
* @experimental
*/
export interface RxResourceOptions<T, R> extends Omit<ResourceOptions<T, R>, 'loader'> {
loader: (params: ResourceLoaderParams<R>) => Observable<T>;
loader: (params: Omit<ResourceLoaderParams<T, R>, 'set' | 'error'>) => Observable<T>;
}

/**
Expand All @@ -36,20 +36,28 @@ export function rxResource<T, R>(opts: RxResourceOptions<T, R>): ResourceRef<T |
return resource<T, R>({
...opts,
loader: (params) => {
const cancelled = new Subject<void>();
params.abortSignal.addEventListener('abort', () => cancelled.next());
// We want to error if the Observable completes before producing at least one value, so we
// track that here.
let gotValue = false;
let sub: Subscription;

// Note: this is identical to `firstValueFrom` which we can't use,
// because at the time of writing, `core` still supports rxjs 6.x.
return new Promise<T>((resolve, reject) => {
opts
.loader(params)
.pipe(take(1), takeUntil(cancelled))
.subscribe({
next: resolve,
error: reject,
complete: () => reject(new Error('Resource completed before producing a value')),
});
// Track the abort listener so it can be removed if the Observable completes (as a memory
// optimization).
const onAbort = () => sub.unsubscribe();
params.abortSignal.addEventListener('abort', onAbort);

sub = opts.loader(params).subscribe({
next: (value) => {
gotValue = true;
params.set(value);
},
error: params.error,
complete: () => {
if (!gotValue) {
params.error(new Error('Resource completed before producing a value'));
}
params.abortSignal.removeEventListener('abort', onAbort);
},
});
},
});
Expand Down
6 changes: 4 additions & 2 deletions packages/core/src/resource/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,10 @@ export interface ResourceRef<T> extends WritableResource<T> {
*
* @experimental
*/
export interface ResourceLoaderParams<R> {
export interface ResourceLoaderParams<T, R> {
request: Exclude<NoInfer<R>, undefined>;
set: (value: T) => void;
error: (error: unknown) => void;
abortSignal: AbortSignal;
previous: {
status: ResourceStatus;
Expand All @@ -158,7 +160,7 @@ export interface ResourceLoaderParams<R> {
*
* @experimental
*/
export type ResourceLoader<T, R> = (param: ResourceLoaderParams<R>) => PromiseLike<T>;
export type ResourceLoader<T, R> = (param: ResourceLoaderParams<T, R>) => PromiseLike<T> | void;

/**
* Options to the `resource` function, for creating a resource.
Expand Down
61 changes: 38 additions & 23 deletions packages/core/src/resource/resource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class ResourceImpl<T, R> extends BaseWritableResource<T> implements ResourceRef<
});
}

private async loadEffect(): Promise<void> {
private loadEffect(): void {
// Capture the previous status before any state transitions. Note that this is `untracked` since
// we do not want the effect to depend on the state of the resource, only on the request.
const {status: previousStatus} = untracked(this.state);
Expand Down Expand Up @@ -268,51 +268,66 @@ class ResourceImpl<T, R> extends BaseWritableResource<T> implements ResourceRef<
// After the loading operation is cancelled, `this.resolvePendingTask` no longer represents this
// particular task, but this `await` may eventually resolve/reject. Thus, when we cancel in
// response to (1) below, we need to cancel the locally saved task.
const resolvePendingTask = (this.resolvePendingTask = this.pendingTasks.add());
let resolvePendingTask: (() => void) | undefined = (this.resolvePendingTask =
this.pendingTasks.add());

const {signal: abortSignal} = (this.pendingController = new AbortController());
try {
// The actual loading is run through `untracked` - only the request side of `resource` is
// reactive. This avoids any confusion with signals tracking or not tracking depending on
// which side of the `await` they are.
const result = await untracked(() =>
this.loaderFn({
abortSignal,
request: request as Exclude<R, undefined>,
previous: {
status: previousStatus,
},
}),
);

const set = (value: T): void => {
if (abortSignal.aborted) {
// This load operation was cancelled.
return;
}
// Success :)

this.state.set({
status: ResourceStatus.Resolved,
previousStatus: ResourceStatus.Resolved,
value: result,
value,
error: undefined,
});
} catch (err) {

// Resolve the pending task now that the resource has a value.
resolvePendingTask?.();
resolvePendingTask = undefined;
};

const error = (err: unknown): void => {
if (abortSignal.aborted) {
// This load operation was cancelled.
return;
}
// Fail :(

this.state.set({
status: ResourceStatus.Error,
previousStatus: ResourceStatus.Error,
value: this.defaultValue,
error: err,
});
} finally {

// Resolve the pending task now that loading is done.
resolvePendingTask();
resolvePendingTask?.();
resolvePendingTask = undefined;
};

// Free the abort controller to drop any registered 'abort' callbacks.
this.pendingController = undefined;
try {
// The actual loading is run through `untracked` - only the request side of `resource` is
// reactive. This avoids any confusion with signals tracking or not tracking depending on
// which side of the `await` they are.
const res = untracked(() =>
this.loaderFn({
abortSignal,
request: request as Exclude<R, undefined>,
set,
error,
previous: {
status: previousStatus,
},
}),
);
// If the loader returns a `Promise`, track it.
res?.then(set, error);
} catch (err) {
error(err);
}
}

Expand Down

0 comments on commit fe688d5

Please sign in to comment.