Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(daemon): Expose cancellation context in caplets #2079

Merged
merged 3 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/daemon/src/daemon-node-powers.js
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ export const makeSocketPowers = ({ net }) => {
);

/** @type {import('./types.js').SocketPowers['connectPort']} */
const connectPort = ({ port, host, cancelled }) =>
rekmarks marked this conversation as resolved.
Show resolved Hide resolved
const connectPort = ({ port, host }) =>
new Promise((resolve, reject) => {
const conn = net.connect(port, host, err => {
if (err) {
Expand Down
36 changes: 33 additions & 3 deletions packages/daemon/src/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ const makeInspector = (type, number, record) =>
list: () => Object.keys(record),
});

/**
* @param {import('./types.js').Context} context - The context to make far.
* @returns {import('./types.js').FarContext} The far context.
*/
const makeFarContext = context =>
Far('Context', {
cancel: context.cancel,
whenCancelled: () => context.cancelled,
whenDisposed: () => context.disposed,
addDisposalHook: context.onCancel,
});

/**
* @param {import('./types.js').DaemonicPowers} powers
* @param {Promise<number>} webletPortP
Expand Down Expand Up @@ -284,7 +296,12 @@ const makeDaemonCore = async (
// eslint-disable-next-line no-use-before-define
provideValueForFormulaIdentifier(guestFormulaIdentifier)
);
const external = E(workerDaemonFacet).makeUnconfined(specifier, guestP);
const external = E(workerDaemonFacet).makeUnconfined(
specifier,
guestP,
// TODO fix type
/** @type {any} */ (makeFarContext(context)),
rekmarks marked this conversation as resolved.
Show resolved Hide resolved
);
return { external, internal: undefined };
};

Expand Down Expand Up @@ -327,7 +344,12 @@ const makeDaemonCore = async (
// eslint-disable-next-line no-use-before-define
provideValueForFormulaIdentifier(guestFormulaIdentifier)
);
const external = E(workerDaemonFacet).makeBundle(readableBundleP, guestP);
const external = E(workerDaemonFacet).makeBundle(
readableBundleP,
guestP,
// TODO fix type
/** @type {any} */ (makeFarContext(context)),
);
return { external, internal: undefined };
};

Expand Down Expand Up @@ -630,6 +652,14 @@ const makeDaemonCore = async (
return controller;
};

/** @type {import('./types.js').CancelValue} */
const cancelValue = async (formulaIdentifier, reason) => {
await formulaGraphMutex.enqueue();
const controller = provideControllerForFormulaIdentifier(formulaIdentifier);
console.log('Cancelled:');
return controller.context.cancel(reason);
};

/** @type {import('./types.js').ProvideValueForFormulaIdentifier} */
const provideValueForFormulaIdentifier = formulaIdentifier => {
const controller = /** @type {import('./types.js').Controller<>} */ (
Expand Down Expand Up @@ -1065,8 +1095,8 @@ const makeDaemonCore = async (
const makeMailbox = makeMailboxMaker({
getFormulaIdentifierForRef,
provideValueForFormulaIdentifier,
provideControllerForFormulaIdentifier,
provideControllerForFormulaIdentifierAndResolveHandle,
cancelValue,
});

const makeIdentifiedGuestController = makeGuestMaker({
Expand Down
10 changes: 3 additions & 7 deletions packages/daemon/src/mail.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ const { quote: q } = assert;
/**
* @param {object} args
* @param {import('./types.js').ProvideValueForFormulaIdentifier} args.provideValueForFormulaIdentifier
* @param {import('./types.js').ProvideControllerForFormulaIdentifier} args.provideControllerForFormulaIdentifier
* @param {import('./types.js').GetFormulaIdentifierForRef} args.getFormulaIdentifierForRef
* @param {import('./types.js').ProvideControllerForFormulaIdentifierAndResolveHandle} args.provideControllerForFormulaIdentifierAndResolveHandle
* @param {import('./types.js').CancelValue} args.cancelValue
*/
export const makeMailboxMaker = ({
getFormulaIdentifierForRef,
provideValueForFormulaIdentifier,
provideControllerForFormulaIdentifier,
provideControllerForFormulaIdentifierAndResolveHandle,
cancelValue,
}) => {
/**
* @param {object} args
Expand Down Expand Up @@ -81,11 +81,7 @@ export const makeMailboxMaker = ({
if (formulaIdentifier === undefined) {
throw new TypeError(`Unknown pet name: ${q(petName)}`);
}
// Behold, recursion:
const controller =
provideControllerForFormulaIdentifier(formulaIdentifier);
console.log('Cancelled:');
return controller.context.cancel(reason);
return cancelValue(formulaIdentifier, reason);
};

/** @type {import('./types.js').Mail['list']} */
Expand Down
5 changes: 2 additions & 3 deletions packages/daemon/src/mutex.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { makeQueue } from '@endo/stream';

/**
* @returns {{ lock: () => Promise<void>, unlock: () => void, enqueue: (asyncFn: () => Promise<any>) => Promise<any> }}
* @returns {import('./types.js').Mutex}
*/
export const makeMutex = () => {
/** @type {import('@endo/stream').AsyncQueue<void>} */
Expand All @@ -17,8 +17,7 @@ export const makeMutex = () => {
return {
lock,
unlock,
// helper for correct usage
enqueue: async asyncFn => {
enqueue: async (asyncFn = /** @type {any} */ (async () => {})) => {
await lock();
try {
return await asyncFn();
Expand Down
55 changes: 52 additions & 3 deletions packages/daemon/src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,19 +217,58 @@ export interface Topic<
subscribe(): Stream<TRead, TWrite, TReadReturn, TWriteReturn>;
}

/**
* The cancellation context of a live value associated with a formula.
*/
export interface Context {
cancel: (reason?: unknown, logPrefix?: string) => Promise<void>;
/**
* Cancel the value, preparing it for garbage collection. Cancellation
* propagates to all values that depend on this value.
*
* @param reason - The reason for the cancellation.
* @param logPrefix - The prefix to use within the log.
* @returns A promise that is resolved when the value is cancelled and
* can be garbage collected.
*/
cancel: (reason?: Error, logPrefix?: string) => Promise<void>;

/**
* A promise that is rejected when the context is cancelled.
* Once rejected, the cancelled value may initiate any teardown procedures.
*/
cancelled: Promise<never>;

/**
* A promise that is resolved when the context is disposed. This occurs
* after the `cancelled` promise is rejected, and after all disposal hooks
* have been run.
* Once resolved, the value may be garbage collected at any time.
*/
disposed: Promise<void>;

/**
* @param formulaIdentifier - The formula identifier of the value whose
* cancellation should cause this value to be cancelled.
*/
thisDiesIfThatDies: (formulaIdentifier: string) => void;

/**
* @param formulaIdentifier - The formula identifier of the value that should
* be cancelled if this value is cancelled.
*/
thatDiesIfThisDies: (formulaIdentifier: string) => void;

/**
* @param hook - A hook to run when the value is cancelled.
*/
onCancel: (hook: () => void | Promise<void>) => void;
}

export interface FarContext {
cancel: (reason: string) => Promise<never>;
cancel: (reason: Error) => Promise<void>;
whenCancelled: () => Promise<never>;
whenDisposed: () => Promise<void>;
addDisposalHook: Context['onCancel'];
}

export interface InternalExternal<External = unknown, Internal = unknown> {
Expand All @@ -252,6 +291,10 @@ export type ProvideControllerForFormulaIdentifier = (
export type ProvideControllerForFormulaIdentifierAndResolveHandle = (
formulaIdentifier: string,
) => Promise<Controller>;
export type CancelValue = (
formulaIdentifier: string,
reason: Error,
) => Promise<void>;

/**
* A handle is used to create a pointer to a formula without exposing it directly.
Expand Down Expand Up @@ -322,7 +365,7 @@ export interface Mail {
listSpecial(): Array<string>;
listAll(): Array<string>;
reverseLookupFormulaIdentifier(formulaIdentifier: string): Array<string>;
cancel(petName: string, reason: unknown): Promise<void>;
cancel(petName: string, reason: Error): Promise<void>;
// Mail operations:
listMessages(): Promise<Array<Message>>;
followMessages(): Promise<FarRef<Reader<Message>>>;
Expand Down Expand Up @@ -593,3 +636,9 @@ export type DaemonicPowers = {
persistence: DaemonicPersistencePowers;
control: DaemonicControlPowers;
};

type Mutex = {
lock: () => Promise<void>;
unlock: () => void;
enqueue: <T>(asyncFn?: () => Promise<T>) => Promise<T>;
};
14 changes: 8 additions & 6 deletions packages/daemon/src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,24 @@ export const makeWorkerFacet = ({ cancel }) => {

/**
* @param {string} specifier
* @param {unknown} powersP
* @param {Promise<unknown>} powersP
* @param {Promise<unknown>} contextP
*/
makeUnconfined: async (specifier, powersP) => {
makeUnconfined: async (specifier, powersP, contextP) => {
// Windows absolute path includes drive letter which is confused for
// protocol specifier. So, we reformat the specifier to include the
// file protocol.
const specifierUrl = normalizeFilePath(specifier);
const namespace = await import(specifierUrl);
return namespace.make(powersP);
return namespace.make(powersP, contextP);
},

/**
* @param {import('@endo/eventual-send').ERef<import('./types.js').EndoReadable>} readableP
* @param {unknown} powersP
* @param {Promise<unknown>} powersP
* @param {Promise<unknown>} contextP
*/
makeBundle: async (readableP, powersP) => {
makeBundle: async (readableP, powersP, contextP) => {
const bundleText = await E(readableP).text();
const bundle = JSON.parse(bundleText);

Expand All @@ -90,7 +92,7 @@ export const makeWorkerFacet = ({ cancel }) => {
const namespace = await importBundle(bundle, {
endowments,
});
return namespace.make(powersP);
return namespace.make(powersP, contextP);
},
});
};
Expand Down
14 changes: 14 additions & 0 deletions packages/daemon/test/context-consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { E, Far } from '@endo/far';

export const make = async (_powers, context) => {
return Far('Context consumer', {
async awaitCancellation() {
try {
await E(context).whenCancelled();
} catch {
return 'cancelled';
}
throw new Error('should have been cancelled');
},
});
};
82 changes: 75 additions & 7 deletions packages/daemon/test/test-endo.js
Original file line number Diff line number Diff line change
Expand Up @@ -566,11 +566,11 @@ test('guest facet receives a message for host', async t => {
await stop(locator);
});

test('direct termination', async t => {
test('direct cancellation', async t => {
const { promise: cancelled, reject: cancel } = makePromiseKit();
t.teardown(() => cancel(Error('teardown')));

const locator = makeLocator('tmp', 'termination-direct');
const locator = makeLocator('tmp', 'cancellation-direct');

await start(locator);
t.teardown(() => stop(locator));
Expand Down Expand Up @@ -643,16 +643,14 @@ test('direct termination', async t => {
['counter'],
),
);

t.pass();
});

// See: https://github.com/endojs/endo/issues/2074
test.failing('indirect termination', async t => {
test.failing('indirect cancellation', async t => {
const { promise: cancelled, reject: cancel } = makePromiseKit();
t.teardown(() => cancel(Error('teardown')));

const locator = makeLocator('tmp', 'termination-indirect');
const locator = makeLocator('tmp', 'cancellation-indirect');

await start(locator);
t.teardown(() => stop(locator));
Expand Down Expand Up @@ -732,7 +730,7 @@ test('cancel because of requested capability', async t => {
const { promise: cancelled, reject: cancel } = makePromiseKit();
t.teardown(() => cancel(Error('teardown')));

const locator = makeLocator('tmp', 'termination-via-request');
const locator = makeLocator('tmp', 'cancellation-via-request');

await start(locator);
t.teardown(() => stop(locator));
Expand Down Expand Up @@ -816,6 +814,76 @@ test('cancel because of requested capability', async t => {
);
});

test('unconfined service can respond to cancellation', async t => {
const { promise: cancelled, reject: cancel } = makePromiseKit();
t.teardown(() => cancel(Error('teardown')));

const locator = makeLocator('tmp', 'cancellation-unconfined-response');

await start(locator);
t.teardown(() => stop(locator));

const { getBootstrap } = await makeEndoClient(
'client',
locator.sockPath,
cancelled,
);
const bootstrap = getBootstrap();
const host = E(bootstrap).host();
await E(host).provideWorker('worker');

const capletPath = path.join(dirname, 'test', 'context-consumer.js');
const capletLocation = url.pathToFileURL(capletPath).href;
await E(host).makeUnconfined(
'worker',
capletLocation,
'NONE',
'context-consumer',
);

const result = E(host).evaluate(
'worker',
'E(caplet).awaitCancellation()',
['caplet'],
['context-consumer'],
);
await E(host).cancel('context-consumer');
t.is(await result, 'cancelled');
});

test('confined service can respond to cancellation', async t => {
const { promise: cancelled, reject: cancel } = makePromiseKit();
t.teardown(() => cancel(Error('teardown')));

const locator = makeLocator('tmp', 'cancellation-confined-response');

await start(locator);
t.teardown(() => stop(locator));

const { getBootstrap } = await makeEndoClient(
'client',
locator.sockPath,
cancelled,
);
const bootstrap = getBootstrap();
const host = E(bootstrap).host();
await E(host).provideWorker('worker');

const capletPath = path.join(dirname, 'test', 'context-consumer.js');
await doMakeBundle(host, capletPath, bundleName =>
E(host).makeBundle('worker', bundleName, 'NONE', 'context-consumer'),
);

const result = E(host).evaluate(
'worker',
'E(caplet).awaitCancellation()',
['caplet'],
['context-consumer'],
);
await E(host).cancel('context-consumer');
t.is(await result, 'cancelled');
});

test('make a host', async t => {
const { promise: cancelled, reject: cancel } = makePromiseKit();
t.teardown(() => cancel(Error('teardown')));
Expand Down
Loading