-
Notifications
You must be signed in to change notification settings - Fork 222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(asyncFlow): E
support
#9322
Changes from all commits
84272e2
5641b7c
a112e78
893f32f
2b1ae04
fbac9e9
0f1fbf0
b7c661a
ecd8a2e
b1796b5
a461f24
084be19
afdc771
b667373
ba9972f
942da78
2d9ccb9
290ded5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,9 +8,8 @@ import { makeConvertKit } from './convert.js'; | |
|
||
/** | ||
* @import {PromiseKit} from '@endo/promise-kit' | ||
* @import {Zone} from '@agoric/base-zone'; | ||
* @import {Vow, VowTools} from '@agoric/vow' | ||
* @import {AsyncFlow} from '../src/async-flow.js' | ||
* @import {Passable, PassableCap} from '@endo/pass-style' | ||
* @import {Vow, VowTools, VowKit} from '@agoric/vow' | ||
* @import {LogStore} from '../src/log-store.js'; | ||
* @import {Bijection} from '../src/bijection.js'; | ||
* @import {Host, HostVow, LogEntry, Outcome} from '../src/types.js'; | ||
|
@@ -32,7 +31,7 @@ export const makeReplayMembrane = ( | |
watchWake, | ||
panic, | ||
) => { | ||
const { when } = vowTools; | ||
const { when, makeVowKit } = vowTools; | ||
|
||
const equate = makeEquate(bijection); | ||
|
||
|
@@ -214,12 +213,111 @@ export const makeReplayMembrane = ( | |
|
||
// //////////////// Eventual Send //////////////////////////////////////////// | ||
|
||
/** | ||
* @param {PassableCap} hostTarget | ||
* @param {string | undefined} optVerb | ||
* @param {Passable[]} hostArgs | ||
* @param {number} callIndex | ||
* @param {VowKit} hostResultKit | ||
* @param {Promise} guestReturnedP | ||
* @returns {Outcome} | ||
*/ | ||
const performSend = ( | ||
hostTarget, | ||
optVerb, | ||
hostArgs, | ||
callIndex, | ||
hostResultKit, | ||
guestReturnedP, | ||
) => { | ||
const { vow, resolver } = hostResultKit; | ||
try { | ||
const hostPromise = optVerb | ||
? E(hostTarget)[optVerb](...hostArgs) | ||
: E(hostTarget)(...hostArgs); | ||
resolver.resolve(hostPromise); // TODO does this always work? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah thinking about this, if |
||
} catch (hostProblem) { | ||
throw Fail`internal: eventual send synchrously failed ${hostProblem}`; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since I didn't understand this right originally, let's add a comment: Since a well behaved eventual-send should not synchronously fail (it doesn't synchronously interact with the target), a synchronous throw does not represent a failure we should commit to, but instead is a panic. |
||
} | ||
try { | ||
const entry = harden(['doReturn', callIndex, vow]); | ||
log.pushEntry(entry); | ||
const guestPromise = makeGuestForHostVow(vow, guestReturnedP); | ||
// Note that `guestPromise` is not registered in the bijection since | ||
// guestReturnedP is already the guest for vow. Rather, the handler | ||
// returns guestPromise to resolve guestReturnedP to guestPromise. | ||
const { kind } = doReturn(callIndex, vow); | ||
kind === 'return' || Fail`internal: "return" kind expected ${q(kind)}`; | ||
return harden({ | ||
kind: 'return', | ||
result: guestPromise, | ||
}); | ||
} catch (problem) { | ||
throw panic(problem); | ||
} | ||
Comment on lines
+255
to
+257
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't the caller of |
||
}; | ||
|
||
const guestHandler = harden({ | ||
applyMethod(guestTarget, optVerb, guestArgs, guestReturnedP) { | ||
if (optVerb === undefined) { | ||
throw Panic`guest eventual call not yet supported: ${guestTarget}(${b(guestArgs)}) -> ${b(guestReturnedP)}`; | ||
} else { | ||
throw Panic`guest eventual send not yet supported: ${guestTarget}.${b(optVerb)}(${b(guestArgs)}) -> ${b(guestReturnedP)}`; | ||
const callIndex = log.getIndex(); | ||
if (stopped || !bijection.hasGuest(guestTarget)) { | ||
Fail`Sent from a previous run: ${guestTarget}`; | ||
} | ||
// TODO FIX BUG this is not quite right. When guestResultP is returned | ||
// as the resolution of guestResultP, it create a visious cycle error. | ||
const hostResultKit = makeVowKit(); | ||
bijection.init(guestReturnedP, hostResultKit.vow); | ||
Comment on lines
+266
to
+269
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you clarify how this cycle could actually happen?
Comment on lines
+268
to
+269
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think if we're replaying we need to use the I also don't see how we're rewiring the |
||
/** @type {Outcome} */ | ||
let outcome; | ||
try { | ||
const guestEntry = harden([ | ||
'checkSend', | ||
guestTarget, | ||
optVerb, | ||
guestArgs, | ||
callIndex, | ||
]); | ||
if (log.isReplaying()) { | ||
const entry = log.nextEntry(); | ||
equate( | ||
guestEntry, | ||
entry, | ||
`replay ${callIndex}: | ||
${q(guestEntry)} | ||
vs ${q(entry)} | ||
`, | ||
); | ||
outcome = /** @type {Outcome} */ (nestInterpreter(callIndex)); | ||
} else { | ||
const entry = guestToHost(guestEntry); | ||
log.pushEntry(entry); | ||
const [_op, hostTarget, _optVerb, hostArgs, _callIndex] = entry; | ||
nestInterpreter(callIndex); | ||
outcome = performSend( | ||
hostTarget, | ||
optVerb, | ||
hostArgs, | ||
callIndex, | ||
hostResultKit, | ||
guestReturnedP, | ||
); | ||
} | ||
} catch (fatalError) { | ||
throw panic(fatalError); | ||
} | ||
|
||
switch (outcome.kind) { | ||
case 'return': { | ||
return outcome.result; | ||
} | ||
case 'throw': { | ||
throw outcome.problem; | ||
} | ||
Comment on lines
+313
to
+315
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this can ever happen, can it? |
||
default: { | ||
// @ts-expect-error TS correctly knows this case would be outside | ||
// the type. But that's what we want to check. | ||
throw Panic`unexpected outcome kind ${q(outcome.kind)}`; | ||
} | ||
} | ||
}, | ||
applyFunction(guestTarget, guestArgs, guestReturnedP) { | ||
|
@@ -321,11 +419,19 @@ export const makeReplayMembrane = ( | |
|
||
/** | ||
* @param {Vow} hVow | ||
* @param {Promise} [promiseKey] | ||
* If provided, use this promise as the key in the guestPromiseMap | ||
* rather than the returned promise. This only happens when the | ||
* promiseKey ends up forwarded to the returned promise anyway, so | ||
* associating it with this resolve/reject pair is not incorrect. | ||
* It is needed when `promiseKey` is also entered into the bijection | ||
* paired with hVow. | ||
* @returns {Promise} | ||
*/ | ||
const makeGuestForHostVow = hVow => { | ||
const makeGuestForHostVow = (hVow, promiseKey = undefined) => { | ||
const { promise, resolve, reject } = makeGuestPromiseKit(); | ||
guestPromiseMap.set(promise, harden({ resolve, reject })); | ||
promiseKey ??= promise; | ||
guestPromiseMap.set(promiseKey, harden({ resolve, reject })); | ||
|
||
watchWake(hVow); | ||
|
||
|
@@ -349,7 +455,7 @@ export const makeReplayMembrane = ( | |
hVow, | ||
async hostFulfillment => { | ||
await log.promiseReplayDone(); // should never reject | ||
if (!stopped && guestPromiseMap.get(promise) !== 'settled') { | ||
if (!stopped && guestPromiseMap.get(promiseKey) !== 'settled') { | ||
/** @type {LogEntry} */ | ||
const entry = harden(['doFulfill', hVow, hostFulfillment]); | ||
log.pushEntry(entry); | ||
|
@@ -364,7 +470,7 @@ export const makeReplayMembrane = ( | |
}, | ||
async hostReason => { | ||
await log.promiseReplayDone(); // should never reject | ||
if (!stopped && guestPromiseMap.get(promise) !== 'settled') { | ||
if (!stopped && guestPromiseMap.get(promiseKey) !== 'settled') { | ||
/** @type {LogEntry} */ | ||
const entry = harden(['doReject', hVow, hostReason]); | ||
log.pushEntry(entry); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of host vows, we should be using the
V
helper instead ofE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After talking with @michaelfig we clarified that
V
is heap only, and does not queue sends durably. We need to structure the membrane handling of sends to vows such that they are only sent after the vow has settled. We can rely on the fact that the membrane has replayed (no need to durably store the pending sends, instead use the guest promise).