diff --git a/src/helpers/promise.d.ts b/src/helpers/promise.d.ts index e109e5a7b..cd6b722de 100644 --- a/src/helpers/promise.d.ts +++ b/src/helpers/promise.d.ts @@ -32,6 +32,7 @@ export function incrementExpectedAwaits(): number; export function decrementExpectedAwaits(sourceTaskId?: number): void; export function beginMicroTickScope(): boolean; export function endMicroTickScope(): void; +export function execInGlobalContext(cb: Function): void; export declare var DexiePromise : DexiePromiseConstructor; export default DexiePromise; diff --git a/src/helpers/promise.js b/src/helpers/promise.js index a7f4a7dc3..757a974c8 100644 --- a/src/helpers/promise.js +++ b/src/helpers/promise.js @@ -640,7 +640,7 @@ export function wrap (fn, errorCatcher) { const task = { awaits: 0, echoes: 0, id: 0}; // The ongoing macro-task when using zone-echoing. var taskCounter = 0; // ID counter for macro tasks. var zoneStack = []; // Stack of left zones to restore asynchronically. -var zoneEchoes = 0; // zoneEchoes is a must in order to persist zones between native await expressions. +var zoneEchoes = 0; // When > 0, zoneLeaveEcho is queued. When 0 and task.echoes is also 0, nothing is queued. var totalEchoes = 0; // ID counter for micro-tasks. Used to detect possible native await in our Promise.prototype.then. @@ -724,8 +724,9 @@ export function onPossibleParallellAsync (possiblePromise) { function zoneEnterEcho(targetZone) { ++totalEchoes; //console.log("Total echoes ", totalEchoes); + //if (task.echoes === 1) console.warn("Cancelling echoing of async context."); if (!task.echoes || --task.echoes === 0) { - task.echoes = task.id = 0; // Cancel zone echoing. + task.echoes = task.awaits = task.id = 0; // Cancel echoing. } zoneStack.push(PSD); @@ -829,6 +830,19 @@ function getPatchedPromiseThen (origThen, zone) { }; } +/** Execute callback in global context */ +export function execInGlobalContext(cb) { + if (Promise === NativePromise && task.echoes === 0) { + if (zoneEchoes === 0) { + cb(); + } else { + enqueueNativeMicroTask(cb); + } + } else { + setTimeout(cb, 0); + } +} + export var rejection = DexiePromise.reject; export {DexiePromise}; diff --git a/src/live-query/live-query.ts b/src/live-query/live-query.ts index d47d76062..cef1232d4 100644 --- a/src/live-query/live-query.ts +++ b/src/live-query/live-query.ts @@ -1,4 +1,4 @@ -import { isAsyncFunction, keys, objectIsEmpty } from '../functions/utils'; +import { _global, isAsyncFunction, keys, objectIsEmpty } from '../functions/utils'; import { globalEvents, DEXIE_STORAGE_MUTATED_EVENT_NAME, @@ -7,6 +7,7 @@ import { beginMicroTickScope, decrementExpectedAwaits, endMicroTickScope, + execInGlobalContext, incrementExpectedAwaits, NativePromise, newScope, @@ -33,6 +34,8 @@ export interface LiveQueryContext { querier: Function; // For debugging purposes and Error messages } +let liveQueryCounter = 0; + export function liveQuery(querier: () => T | Promise): IObservable { let hasValue = false; let currentValue: T; @@ -41,6 +44,7 @@ export function liveQuery(querier: () => T | Promise): IObservable { function execute(ctx: LiveQueryContext) { const wasRootExec = beginMicroTickScope(); // Performance: Avoid starting a new microtick scope within the async context. try { + console.log("LiveQuery ID: " + ++liveQueryCounter); if (scopeFuncIsAsync) { incrementExpectedAwaits(); } @@ -50,21 +54,7 @@ export function liveQuery(querier: () => T | Promise): IObservable { // This fixes zone leaking issue that the liveQuery zone can leak to observer's next microtask. rv = (rv as Promise).finally(decrementExpectedAwaits); } - return Promise.resolve(rv).finally(()=>{ - if (PSD.subscr === ctx.subscr) { - // Querier did not await all code paths. We must wait for the next macrotask to run in order to - // escape from zone echoing. Warn to console so that app code can be corrected. liveQuery callbacks - // shall be pure functions and should never spawn side effects - so there is never a need to call - // other async functions or generated promises without awaiting them. - console.warn(`Dexie liveQuery()'s querier callback did'nt await all of its spawned promises. Querier source: ${querier}`); - return new NativePromise(resolve => setTimeout(resolve, 0)); // Wait for the next macrotask to run. - // @ts-ignore - } else if (Promise.PSD) { - // Still in async context (zone echoing) from another task. Wait for the next macrotask to run. - // @ts-ignore - return new NativePromise(resolve => setTimeout(resolve, 0)); // Wait for the next macrotask to run. - } - }); + return rv; } finally { wasRootExec && endMicroTickScope(); // Given that we created the microtick scope, we must also end it. } @@ -92,6 +82,8 @@ export function liveQuery(querier: () => T | Promise): IObservable { let startedListening = false; + const doQuery = () => execInGlobalContext(_doQuery); + function shouldNotify() { return obsSetsOverlap(currentObs, accumMuts); } @@ -132,7 +124,7 @@ export function liveQuery(querier: () => T | Promise): IObservable { (result) => { hasValue = true; currentValue = result; - if (closed || ctx.signal.aborted) { + if (closed || ctx.signal.aborted) { // closed - no subscriber anymore. // signal.aborted - new query was made before this one completed and // the querier might have catched AbortError and return successful result. @@ -148,25 +140,29 @@ export function liveQuery(querier: () => T | Promise): IObservable { globalEvents(DEXIE_STORAGE_MUTATED_EVENT_NAME, mutationListener); startedListening = true; } - observer.next && observer.next(result); + execInGlobalContext(()=>!closed && observer.next && observer.next(result)); }, (err) => { hasValue = false; if (!['DatabaseClosedError', 'AbortError'].includes(err?.name)) { - if (closed) return; - observer.error && observer.error(err); + if (!closed) execInGlobalContext(()=>{ + if (closed) return; + observer.error && observer.error(err); + }); } } ); }; - const doQuery = () => { - // @ts-ignore - if (PSD.global && !Promise.PSD) _doQuery(); - else setTimeout(_doQuery, 0); - } - - doQuery(); + // Use setTimeot here to guarantee execution in a private macro task before and + // after. The helper executeInGlobalContext(_doQuery) is not enough here because + // caller of `subscribe()` could be anything, such as a frontend framework that will + // continue in the same tick after subscribe() is called and call other + // eftects, that could involve dexie operations such as writing to the DB. + // If that happens, the private zone echoes from a live query tast started here + // could still be ongoing when the other operations start and make them inherit + // the async context from a live query. + setTimeout(doQuery, 0); return subscription; }); observable.hasValue = () => hasValue;