Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(asyncFlow): E support #9322

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 118 additions & 12 deletions packages/async-flow/src/replay-membrane.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import { makeConvertKit } from './convert.js';

/**
* @import {PromiseKit} from '@endo/promise-kit'
* @import {Zone} from '@agoric/base-zone';
* @import {Vow, VowTools} from '@agoric/vow'
* @import {AsyncFlow} from '../src/async-flow.js'
* @import {Passable, PassableCap} from '@endo/pass-style'
* @import {Vow, VowTools, VowKit} from '@agoric/vow'
* @import {LogStore} from '../src/log-store.js';
* @import {Bijection} from '../src/bijection.js';
* @import {Host, HostVow, LogEntry, Outcome} from '../src/types.js';
Expand All @@ -32,7 +31,7 @@ export const makeReplayMembrane = (
watchWake,
panic,
) => {
const { when } = vowTools;
const { when, makeVowKit } = vowTools;

const equate = makeEquate(bijection);

Expand Down Expand Up @@ -214,12 +213,111 @@ export const makeReplayMembrane = (

// //////////////// Eventual Send ////////////////////////////////////////////

/**
* @param {PassableCap} hostTarget
* @param {string | undefined} optVerb
* @param {Passable[]} hostArgs
* @param {number} callIndex
* @param {VowKit} hostResultKit
* @param {Promise} guestReturnedP
* @returns {Outcome}
*/
const performSend = (
hostTarget,
optVerb,
hostArgs,
callIndex,
hostResultKit,
guestReturnedP,
) => {
const { vow, resolver } = hostResultKit;
try {
const hostPromise = optVerb
? E(hostTarget)[optVerb](...hostArgs)
: E(hostTarget)(...hostArgs);
Comment on lines +236 to +237
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of host vows, we should be using the V helper instead of E

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After talking with @michaelfig we clarified that V is heap only, and does not queue sends durably. We need to structure the membrane handling of sends to vows such that they are only sent after the vow has settled. We can rely on the fact that the membrane has replayed (no need to durably store the pending sends, instead use the guest promise).

resolver.resolve(hostPromise); // TODO does this always work?
Copy link
Member

@mhofman mhofman May 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah thinking about this, if hostTarget is a local object for which call returns a promise, we will fail during an upgrade. Unlike the checkCall case, we're not in a position to do any enforcment and fail unconditionally, unless we can somehow sniff the type of the hostTarget (if it has an eventual handler or not), and bypass the E call if it doesn't. If the hostTarget object does not return a promise, then the hostPromise will be fulfilled in the same crank, so it's safe.

} catch (hostProblem) {
throw Fail`internal: eventual send synchrously failed ${hostProblem}`;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I didn't understand this right originally, let's add a comment: Since a well behaved eventual-send should not synchronously fail (it doesn't synchronously interact with the target), a synchronous throw does not represent a failure we should commit to, but instead is a panic.

}
try {
const entry = harden(['doReturn', callIndex, vow]);
log.pushEntry(entry);
const guestPromise = makeGuestForHostVow(vow, guestReturnedP);
// Note that `guestPromise` is not registered in the bijection since
// guestReturnedP is already the guest for vow. Rather, the handler
// returns guestPromise to resolve guestReturnedP to guestPromise.
const { kind } = doReturn(callIndex, vow);
kind === 'return' || Fail`internal: "return" kind expected ${q(kind)}`;
return harden({
kind: 'return',
result: guestPromise,
});
} catch (problem) {
throw panic(problem);
}
Comment on lines +255 to +257
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't the caller of performSend already panic on exceptions?

};

const guestHandler = harden({
applyMethod(guestTarget, optVerb, guestArgs, guestReturnedP) {
if (optVerb === undefined) {
throw Panic`guest eventual call not yet supported: ${guestTarget}(${b(guestArgs)}) -> ${b(guestReturnedP)}`;
} else {
throw Panic`guest eventual send not yet supported: ${guestTarget}.${b(optVerb)}(${b(guestArgs)}) -> ${b(guestReturnedP)}`;
const callIndex = log.getIndex();
if (stopped || !bijection.hasGuest(guestTarget)) {
Fail`Sent from a previous run: ${guestTarget}`;
}
// TODO FIX BUG this is not quite right. When guestResultP is returned
// as the resolution of guestResultP, it create a visious cycle error.
const hostResultKit = makeVowKit();
bijection.init(guestReturnedP, hostResultKit.vow);
Comment on lines +266 to +269
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify how this cycle could actually happen?

Comment on lines +268 to +269
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we're replaying we need to use the vow that was previously created for the send, no?

I also don't see how we're rewiring the guestReturnedP to the watched vow on replay.

/** @type {Outcome} */
let outcome;
try {
const guestEntry = harden([
'checkSend',
guestTarget,
optVerb,
guestArgs,
callIndex,
]);
if (log.isReplaying()) {
const entry = log.nextEntry();
equate(
guestEntry,
entry,
`replay ${callIndex}:
${q(guestEntry)}
vs ${q(entry)}
`,
);
outcome = /** @type {Outcome} */ (nestInterpreter(callIndex));
} else {
const entry = guestToHost(guestEntry);
log.pushEntry(entry);
const [_op, hostTarget, _optVerb, hostArgs, _callIndex] = entry;
nestInterpreter(callIndex);
outcome = performSend(
hostTarget,
optVerb,
hostArgs,
callIndex,
hostResultKit,
guestReturnedP,
);
}
} catch (fatalError) {
throw panic(fatalError);
}

switch (outcome.kind) {
case 'return': {
return outcome.result;
}
case 'throw': {
throw outcome.problem;
}
Comment on lines +313 to +315
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this can ever happen, can it?

default: {
// @ts-expect-error TS correctly knows this case would be outside
// the type. But that's what we want to check.
throw Panic`unexpected outcome kind ${q(outcome.kind)}`;
}
}
},
applyFunction(guestTarget, guestArgs, guestReturnedP) {
Expand Down Expand Up @@ -321,11 +419,19 @@ export const makeReplayMembrane = (

/**
* @param {Vow} hVow
* @param {Promise} [promiseKey]
* If provided, use this promise as the key in the guestPromiseMap
* rather than the returned promise. This only happens when the
* promiseKey ends up forwarded to the returned promise anyway, so
* associating it with this resolve/reject pair is not incorrect.
* It is needed when `promiseKey` is also entered into the bijection
* paired with hVow.
* @returns {Promise}
*/
const makeGuestForHostVow = hVow => {
const makeGuestForHostVow = (hVow, promiseKey = undefined) => {
const { promise, resolve, reject } = makeGuestPromiseKit();
guestPromiseMap.set(promise, harden({ resolve, reject }));
promiseKey ??= promise;
guestPromiseMap.set(promiseKey, harden({ resolve, reject }));

watchWake(hVow);

Expand All @@ -349,7 +455,7 @@ export const makeReplayMembrane = (
hVow,
async hostFulfillment => {
await log.promiseReplayDone(); // should never reject
if (!stopped && guestPromiseMap.get(promise) !== 'settled') {
if (!stopped && guestPromiseMap.get(promiseKey) !== 'settled') {
/** @type {LogEntry} */
const entry = harden(['doFulfill', hVow, hostFulfillment]);
log.pushEntry(entry);
Expand All @@ -364,7 +470,7 @@ export const makeReplayMembrane = (
},
async hostReason => {
await log.promiseReplayDone(); // should never reject
if (!stopped && guestPromiseMap.get(promise) !== 'settled') {
if (!stopped && guestPromiseMap.get(promiseKey) !== 'settled') {
/** @type {LogEntry} */
const entry = harden(['doReject', hVow, hostReason]);
log.pushEntry(entry);
Expand Down
14 changes: 7 additions & 7 deletions packages/async-flow/src/type-guards.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ export const LogEntryShape = M.or(
M.arrayOf(M.any()),
M.number(),
],
// [
// 'checkSend',
// M.or(M.remotable('host target'), VowShape),
// M.opt(PropertyKeyShape),
// M.arrayOf(M.any()),
// M.number(),
// ],
[
'checkSend',
M.or(M.remotable('host target'), VowShape),
M.opt(PropertyKeyShape),
M.arrayOf(M.any()),
M.number(),
],
// ['checkReturn', M.number(), M.any()],
// ['checkThrow', M.number(), M.any()],
);
6 changes: 6 additions & 0 deletions packages/async-flow/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ export {};
* optVerb: PropertyKey|undefined,
* args: Host[],
* callIndex: number
* ] | [
* op: 'checkSend',
* target: Host,
* optVerb: PropertyKey|undefined,
* args: Host[],
* callIndex: number
* ]} LogEntry
*/

Expand Down
108 changes: 102 additions & 6 deletions packages/async-flow/test/replay-membrane-eventual.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
} from './prepare-test-env-ava.js';

import { Fail } from '@endo/errors';
import { eventLoopIteration } from '@agoric/internal/src/testing-utils.js';
import { prepareVowTools } from '@agoric/vow';
import { E } from '@endo/eventual-send';
// import E from '@agoric/vow/src/E.js';
Expand Down Expand Up @@ -46,15 +47,19 @@ const preparePingee = zone =>
*/
const testFirstPlay = async (t, zone) => {
const vowTools = prepareVowTools(zone);
const { makeVowKit } = vowTools;
const makeLogStore = prepareLogStore(zone);
const makeBijection = prepareBijection(zone);
const makePingee = preparePingee(zone);
const { vow: v1, resolver: r1 } = zone.makeOnce('v1', () => makeVowKit());
const { vow: _v2, resolver: _r2 } = zone.makeOnce('v2', () => makeVowKit());

const log = zone.makeOnce('log', () => makeLogStore());
const bij = zone.makeOnce('bij', makeBijection);

const mem = makeReplayMembrane(log, bij, vowTools, watchWake, panic);

const p1 = mem.hostToGuest(v1);
t.deepEqual(log.dump(), []);

/** @type {Pingee} */
Expand All @@ -63,18 +68,105 @@ const testFirstPlay = async (t, zone) => {
const guestPingee = mem.hostToGuest(pingee);
t.deepEqual(log.dump(), []);

const pingTestSendResult = t.throwsAsync(() => E(guestPingee).ping('send'), {
message:
'panic over "[Error: guest eventual send not yet supported: \\"[Alleged: Pingee guest wrapper]\\".ping([\\"send\\"]) -> \\"[Promise]\\"]"',
});
const p = E(guestPingee).ping('send');

guestPingee.ping('call');

t.is(await p, undefined);
const dump = log.dump();
const v3 = dump[3][2];
t.deepEqual(dump, [
['checkCall', pingee, 'ping', ['call'], 0],
['doReturn', 0, undefined],
['checkSend', pingee, 'ping', ['send'], 2],
['doReturn', 2, v3],
['doFulfill', v3, undefined],
]);

r1.resolve('x');
t.is(await p1, 'x');

t.deepEqual(log.dump(), [
['checkCall', pingee, 'ping', ['call'], 0],
['doReturn', 0, undefined],
['checkSend', pingee, 'ping', ['send'], 2],
['doReturn', 2, v3],
['doFulfill', v3, undefined],
['doFulfill', v1, 'x'],
]);
};

/**
* @param {any} t
* @param {Zone} zone
*/
const testReplay = async (t, zone) => {
const vowTools = prepareVowTools(zone);
prepareLogStore(zone);
prepareBijection(zone);
preparePingee(zone);
const { vow: v1 } = zone.makeOnce('v1', () => Fail`need v1`);
const { vow: v2, resolver: r2 } = zone.makeOnce('v2', () => Fail`need v2`);

const log = /** @type {LogStore} */ (
zone.makeOnce('log', () => Fail`need log`)
);
const bij = /** @type {Bijection} */ (
zone.makeOnce('bij', () => Fail`need bij`)
);

const pingee = zone.makeOnce('pingee', () => Fail`need pingee`);

const dump = log.dump();
const v3 = dump[3][2];
t.deepEqual(dump, [
['checkCall', pingee, 'ping', ['call'], 0],
['doReturn', 0, undefined],
['checkSend', pingee, 'ping', ['send'], 2],
['doReturn', 2, v3],
['doFulfill', v3, undefined],
['doFulfill', v1, 'x'],
]);

const mem = makeReplayMembrane(log, bij, vowTools, watchWake, panic);
t.true(log.isReplaying());
t.is(log.getIndex(), 0);

const guestPingee = mem.hostToGuest(pingee);
const p2 = mem.hostToGuest(v2);
// @ts-expect-error TS doesn't know that r2 is a resolver
r2.resolve('y');
await eventLoopIteration();

const p1 = mem.hostToGuest(v1);
mem.wake();
t.true(log.isReplaying());
t.is(log.getIndex(), 0);
t.deepEqual(log.dump(), [
['checkCall', pingee, 'ping', ['call'], 0],
['doReturn', 0, undefined],
['checkSend', pingee, 'ping', ['send'], 2],
['doReturn', 2, v3],
['doFulfill', v3, undefined],
['doFulfill', v1, 'x'],
]);

E(guestPingee).ping('send');

guestPingee.ping('call');

await pingTestSendResult;
t.is(await p1, 'x');
t.is(await p2, 'y');
t.false(log.isReplaying());

t.deepEqual(log.dump(), [
['checkCall', pingee, 'ping', ['call'], 0],
['doReturn', 0, undefined],
['checkSend', pingee, 'ping', ['send'], 2],
['doReturn', 2, v3],
['doFulfill', v3, undefined],
['doFulfill', v1, 'x'],
['doFulfill', v2, 'y'],
]);
};

Expand All @@ -94,5 +186,9 @@ test.serial('test durable replay-membrane settlement', async t => {

nextLife();
const zone1 = makeDurableZone(getBaggage(), 'durableRoot');
return testFirstPlay(t, zone1);
await testFirstPlay(t, zone1);

nextLife();
const zone3 = makeDurableZone(getBaggage(), 'durableRoot');
return testReplay(t, zone3);
});
Loading