Skip to content

Commit

Permalink
Implement concast.beforeNext as replacement for concast.cleanup.
Browse files Browse the repository at this point in the history
Doing this right would potentially resolve issue #9690.
  • Loading branch information
benjamn committed May 13, 2022
1 parent b4ffcb6 commit 2821b36
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 36 deletions.
4 changes: 2 additions & 2 deletions src/core/QueryManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ export class QueryManager<TStore> {

byVariables.set(varJson, observable = concast);

concast.cleanup(() => {
concast.beforeNext(() => {
if (byVariables.delete(varJson) &&
byVariables.size < 1) {
inFlightLinkObservables.delete(serverQuery);
Expand Down Expand Up @@ -1161,7 +1161,7 @@ export class QueryManager<TStore> {
: fromVariables(normalized.variables!)
);

concast.cleanup(() => {
concast.beforeNext(() => {
this.fetchCancelFns.delete(queryId);

if (queryInfo.observableQuery) {
Expand Down
2 changes: 1 addition & 1 deletion src/react/hoc/__tests__/queries/skip.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ describe('[queries] skip', () => {
break;
case 4:
expect(this.props.skip).toBe(false);
expect(this.props.data!.loading).toBe(true);
expect(this.props.data!.loading).toBe(false);
expect(this.props.data.allPeople).toEqual(data.allPeople);
expect(ranQuery).toBe(2);
break;
Expand Down
78 changes: 47 additions & 31 deletions src/utilities/observables/Concast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,30 +121,32 @@ export class Concast<T> extends Observable<T> {
}
}

// Note: cleanup observers do not count towards this total.
private addCount = 0;

public addObserver(observer: Observer<T>) {
if (!this.observers.has(observer)) {
// Immediately deliver the most recent message, so we can always
// be sure all observers have the latest information.
this.deliverLastMessage(observer);
this.observers.add(observer);
++this.addCount;
}
}

public removeObserver(
observer: Observer<T>,
quietly?: boolean,
) {
if (this.observers.delete(observer) &&
--this.addCount < 1 &&
!quietly) {
// In case there are still any cleanup observers in this.observers, and no
// error or completion has been broadcast yet, make sure those observers
// have a chance to run and then remove themselves from this.observers.
this.handlers.complete();
if (
this.observers.delete(observer) &&
this.observers.size < 1
) {
if (quietly) {
// In case there are still any listeners in this.nextResultListeners,
// and no error or completion has been broadcast yet, make sure those
// observers have a chance to run and then remove themselves from
// this.observers.
this.notify("complete");
} else {
this.handlers.complete();
}
}
}

Expand All @@ -160,14 +162,15 @@ export class Concast<T> extends Observable<T> {

// Name and argument of the most recently invoked observer method, used
// to deliver latest results immediately to new observers.
private latest?: ["next" | "error", any];
private latest?: ["next", T] | ["error", any];

// Bound handler functions that can be reused for every internal
// subscription.
private handlers = {
next: (result: T) => {
if (this.sub !== null) {
this.latest = ["next", result];
this.notify("next", result);
iterateObserversSafely(this.observers, "next", result);
}
},
Expand All @@ -182,6 +185,7 @@ export class Concast<T> extends Observable<T> {
this.sub = null;
this.latest = ["error", error];
this.reject(error);
this.notify("error", error);
iterateObserversSafely(this.observers, "error", error);
}
},
Expand All @@ -199,6 +203,7 @@ export class Concast<T> extends Observable<T> {
} else {
this.resolve();
}
this.notify("complete");
// We do not store this.latest = ["complete"], because doing so
// discards useful information about the previous next (or
// error) message. Instead, if new observers subscribe after
Expand All @@ -215,29 +220,35 @@ export class Concast<T> extends Observable<T> {
},
};

public cleanup(callback: () => any) {
private nextResultListeners = new Set<NextResultListener>();

private notify(
method: Parameters<NextResultListener>[0],
arg?: Parameters<NextResultListener>[1],
) {
const { nextResultListeners } = this;
if (nextResultListeners.size) {
// Replacing this.nextResultListeners first ensures it does not grow while
// we are iterating over it, potentially leading to infinite loops.
this.nextResultListeners = new Set;
nextResultListeners.forEach(listener => listener(method, arg));
}
}

// We need a way to run callbacks just *before* the next result (or error or
// completion) is delivered by this Concast, so we can be sure any code that
// runs as a result of delivering that result/error observes the effects of
// running the callback(s). It was tempting to reuse the Observer type instead
// of introducing NextResultListener, but that messes with the sizing and
// maintenance of this.observers, and ends up being more code overall.
beforeNext(callback: NextResultListener) {
let called = false;
const once = () => {
this.nextResultListeners.add((method, arg) => {
if (!called) {
called = true;
// Removing a cleanup observer should not unsubscribe from the
// underlying Observable, so the only removeObserver behavior we
// need here is to delete observer from this.observers.
this.observers.delete(observer);
callback();
callback(method, arg);
}
}
const observer = {
next: once,
error: once,
complete: once,
};
const count = this.addCount;
this.addObserver(observer);
// Normally addObserver increments this.addCount, but we can "hide"
// cleanup observers by restoring this.addCount to its previous value
// after adding any cleanup observer.
this.addCount = count;
});
}

// A public way to abort observation and broadcast.
Expand All @@ -248,6 +259,11 @@ export class Concast<T> extends Observable<T> {
}
}

type NextResultListener = (
method: "next" | "error" | "complete",
arg?: any,
) => any;

// Necessary because the Concast constructor has a different signature
// than the Observable constructor.
fixObservableSubclass(Concast);
4 changes: 2 additions & 2 deletions src/utilities/observables/__tests__/Concast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ describe("Concast Observable (similar to Behavior Subject in RxJS)", () => {
second: 0,
};

concast.cleanup(() => {
concast.beforeNext(() => {
++cleanupCounts.first;
});

Expand All @@ -58,7 +58,7 @@ describe("Concast Observable (similar to Behavior Subject in RxJS)", () => {
},
});

concast.cleanup(() => {
concast.beforeNext(() => {
++cleanupCounts.second;
});

Expand Down

0 comments on commit 2821b36

Please sign in to comment.