-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: async workflows #306
Conversation
return new Promise(async (resolve) => { | ||
// start context with execution hook |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol new Promise(async(resolve) =>
is an interesting pattern. Was there a reason to not do (async () => {})()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I am using the resolve method to complete from other contexts.
this.started = {
// TODO, also cancel?
resolve: (result) => {
const newCommands = this.commandsToEmit;
this.commandsToEmit = [];
resolve({ commands: newCommands, result });
},
};
When something needs to complete the workflow with a forced value like a value, the this.forceComplete
method is called which calls the resolve
on the top promise.
So callers of start
have a reference to the run being completed.
public async continue( | ||
...history: HistoryEvent[] | ||
): Promise<WorkflowResult<Output>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice! Looking FWD to this. Will be very useful in an environment like CF.
Add some docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also needs to be tested, don't think it works in the current state.
await new Promise((resolve) => { | ||
setTimeout(() => { | ||
this.tryCommitResultEvent(event); | ||
resolve(undefined); | ||
}); | ||
}); | ||
// TODO: do we need to use setTimeout here to go to the end of the event loop? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. It comes down to how tryCommitResultEvent
behaves. I'm leaning towards not needing it here. Calling tryCommitResultEvent
will call resolve
and reject
which I think should exhaustively progress the promise chains before returning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like yes, without this, the next events will be added out of order.
const wf = workflow(async () => {
const result = await createExpectSignalCall(
"MySignal",
createAwaitTimerCall(Schedule.duration(100 * 1000, "seconds"))
);
return result ?? "done";
});
In this case, without set timeout, the timer command it not in the output array when the workflow resolves. Likely because the events are all emitted before the calls are able to register their eventuals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the promise allows this to be iterative instead of recursive.
I remember a fun bug in xstate where they used recursion and I hit some depth limits.
eventual.resolve(result.value); | ||
} else if (isFailed(result)) { | ||
eventual.reject(result.error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this will synchronously step in to and begin completing the promise chain.
Perhaps this means you don't need the setTimeout
in drainHistoryEvents
because this code will only return after all of the promises that can resume have resumed? Unless they do IO or something dumb, there shouldn't be anything outstanding on the event loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually ran into issues where I needed to add the setTimeout, the while look would drain all events before the promise resolves would return to the caller.
So I think the local syncronous thread is preferred over the .then
or continuing from await
.
That said, I am going to try to roll things back and see what breaks.
npx eventual start workflow sleepy | ||
|
||
npx eventual start workflow sleepy -f | ||
|
||
npx eventual list executions | ||
|
||
npx eventual list executions --workflow parallel --json > .eventual/out.json | ||
npx eventual list executions --workflow sleepy --json > .eventual/out.json |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test was not using an execution that was started by the same script, which was confusing and could be faulty.
return { | ||
triggers: [ | ||
Trigger.workflowEvent( | ||
WorkflowEventType.ChildWorkflowSucceeded, | ||
(event) => Result.resolved(event.result) | ||
), | ||
Trigger.workflowEvent(WorkflowEventType.ChildWorkflowFailed, (event) => | ||
Result.failed(new EventualError(event.error, event.message)) | ||
), | ||
call.timeout | ||
? Trigger.promise(call.timeout, () => | ||
Result.failed("Child Workflow Timed Out") | ||
) | ||
: undefined, | ||
], | ||
generateCommands(seq) { | ||
return { | ||
kind: CommandType.StartWorkflow, | ||
seq, | ||
name: call.name, | ||
input: call.input, | ||
opts: call.opts, | ||
}; | ||
}, | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know what's going on here. Seems like an over abstraction? It seems to be associating event types with callbacks? What problem was this solving?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pulls out all of the call specific behavior out of the interpreter.
Triggers - when the Eventual can be resolved and what to do (callback) - events, signal, every change (ex: condition), and another promise/eventual (ex: activity.timeout)
GenerateCommands - Replaces the callToCommand method
The triggers also allow us to create lookup tables for eventuals (via seq
), events, and signals instead of searching for things to change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did it need to be separated out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The behavior of each call was spread around the interpreter.
Some things ran all of the time, some emitted commands, some didn't, some were waiting on other promises, etc.
Now the behavior is a well defined contract.
Should make adding new ones trivial as well. Potentially pluggable.
/** | ||
* Starts an execution. | ||
* | ||
* The execution will run until completion or until the events are exhausted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious what happens if it returns while things are still running in the background?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, resolve on the top level promise can only be called once, so that works well.
The only thing I would be concerned about is the commandToEmit array which is stateful, the array get cleared on completion.
If you are asking about a workflow that has ongoing eventuals like dangling promises or signal handlers, I don't think we allow them right now... or we are inconsistent. I... will write more tests. I think at one point we said we want workflows to execute past completion, so maybe that is wrong right now. Our tests didn't cover that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It did used to work and we discussed it before. I guess we didn't add a test. It would be important for signal handlers and also for things like queries if we ever add them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated and tested, succeeded workflows will continue to accept events and emit events. Failed will not.
super(...args); | ||
export function hookDate() { | ||
if (!isDateHooked()) { | ||
globalThis.Date = class extends Date { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to be able to restore it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a restoreDate
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your best work yet. Amazing stuff. Let's go. Had some comments but I think it's ready.
if (isActivityCall(call)) { | ||
return { | ||
triggers: [ | ||
Trigger.workflowEvent(WorkflowEventType.ActivitySucceeded, (event) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this mean "onWorkflowEvent"? Would that be a better name? Or just onEvent?
seq: number; | ||
} | ||
|
||
export function createEventualPromise<R>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
document why this is needed
commands: WorkflowCommand[]; | ||
} | ||
|
||
export type Trigger<OwnRes> = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does OwnRes
mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Own result as opposed to the result of the trigger. Will make this more clear.
takes a bow Great feedback, updated, will merge once the tests pass. |
Complete re-implementation of the workflow re-entry interpreter.
Highlights:
WorkflowExecutor
, implemented as a classTODO: