Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: async workflows #306

Merged
merged 30 commits into from
Mar 9, 2023
Merged

feat: async workflows #306

merged 30 commits into from
Mar 9, 2023

Conversation

thantos
Copy link
Contributor

@thantos thantos commented Mar 3, 2023

Complete re-implementation of the workflow re-entry interpreter.

Highlights:

  • Works with async/Promises, does not require a transformer
  • Abstracts the eventuals into a consistent contract (decoupled from the interepter)
  • Interpreter has been renamed to WorkflowExecutor, implemented as a class
  • Starting support for long running, in memory workflows. Bootstrap with history and then keep sending in new events until complete.

TODO:

  • More testing
    • Concurrency
    • Dangling promises
    • Runtime tests
  • Cancellation - promises currently do not cancel
  • Clean up and straighten out terms - disgusting right now
  • Look into turning logging and date hooks into context and plug-ins

packages/@eventual/core-runtime/src/workflow-executor.ts Outdated Show resolved Hide resolved
Comment on lines 171 to 172
return new Promise(async (resolve) => {
// start context with execution hook
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol new Promise(async(resolve) => is an interesting pattern. Was there a reason to not do (async () => {})()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I am using the resolve method to complete from other contexts.

      this.started = {
        // TODO, also cancel?
        resolve: (result) => {
          const newCommands = this.commandsToEmit;
          this.commandsToEmit = [];

          resolve({ commands: newCommands, result });
        },
      };

When something needs to complete the workflow with a forced value like a value, the this.forceComplete method is called which calls the resolve on the top promise.

So callers of start have a reference to the run being completed.

packages/@eventual/core-runtime/src/workflow-executor.ts Outdated Show resolved Hide resolved
Comment on lines 223 to 225
public async continue(
...history: HistoryEvent[]
): Promise<WorkflowResult<Output>> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! Looking FWD to this. Will be very useful in an environment like CF.

Add some docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also needs to be tested, don't think it works in the current state.

packages/@eventual/core-runtime/src/workflow-executor.ts Outdated Show resolved Hide resolved
Comment on lines 346 to 352
await new Promise((resolve) => {
setTimeout(() => {
this.tryCommitResultEvent(event);
resolve(undefined);
});
});
// TODO: do we need to use setTimeout here to go to the end of the event loop?
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. It comes down to how tryCommitResultEvent behaves. I'm leaning towards not needing it here. Calling tryCommitResultEvent will call resolve and reject which I think should exhaustively progress the promise chains before returning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like yes, without this, the next events will be added out of order.

    const wf = workflow(async () => {
      const result = await createExpectSignalCall(
        "MySignal",
        createAwaitTimerCall(Schedule.duration(100 * 1000, "seconds"))
      );

      return result ?? "done";
    });

In this case, without set timeout, the timer command it not in the output array when the workflow resolves. Likely because the events are all emitted before the calls are able to register their eventuals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the promise allows this to be iterative instead of recursive.

I remember a fun bug in xstate where they used recursion and I hit some depth limits.

Comment on lines 398 to 400
eventual.resolve(result.value);
} else if (isFailed(result)) {
eventual.reject(result.error);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this will synchronously step in to and begin completing the promise chain.

Perhaps this means you don't need the setTimeout in drainHistoryEvents because this code will only return after all of the promises that can resume have resumed? Unless they do IO or something dumb, there shouldn't be anything outstanding on the event loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually ran into issues where I needed to add the setTimeout, the while look would drain all events before the promise resolves would return to the caller.

So I think the local syncronous thread is preferred over the .then or continuing from await.

That said, I am going to try to roll things back and see what breaks.

@thantos thantos marked this pull request as ready for review March 6, 2023 23:01
@thantos thantos requested a review from sam-goodwin March 7, 2023 03:39
Comment on lines +11 to +17
npx eventual start workflow sleepy

npx eventual start workflow sleepy -f

npx eventual list executions

npx eventual list executions --workflow parallel --json > .eventual/out.json
npx eventual list executions --workflow sleepy --json > .eventual/out.json
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was not using an execution that was started by the same script, which was confusing and could be faulty.

Comment on lines 57 to 81
return {
triggers: [
Trigger.workflowEvent(
WorkflowEventType.ChildWorkflowSucceeded,
(event) => Result.resolved(event.result)
),
Trigger.workflowEvent(WorkflowEventType.ChildWorkflowFailed, (event) =>
Result.failed(new EventualError(event.error, event.message))
),
call.timeout
? Trigger.promise(call.timeout, () =>
Result.failed("Child Workflow Timed Out")
)
: undefined,
],
generateCommands(seq) {
return {
kind: CommandType.StartWorkflow,
seq,
name: call.name,
input: call.input,
opts: call.opts,
};
},
};
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what's going on here. Seems like an over abstraction? It seems to be associating event types with callbacks? What problem was this solving?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pulls out all of the call specific behavior out of the interpreter.

Triggers - when the Eventual can be resolved and what to do (callback) - events, signal, every change (ex: condition), and another promise/eventual (ex: activity.timeout)

GenerateCommands - Replaces the callToCommand method

The triggers also allow us to create lookup tables for eventuals (via seq), events, and signals instead of searching for things to change.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did it need to be separated out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior of each call was spread around the interpreter.

Some things ran all of the time, some emitted commands, some didn't, some were waiting on other promises, etc.

Now the behavior is a well defined contract.

Should make adding new ones trivial as well. Potentially pluggable.

packages/@eventual/core-runtime/src/workflow-executor.ts Outdated Show resolved Hide resolved
/**
* Starts an execution.
*
* The execution will run until completion or until the events are exhausted.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious what happens if it returns while things are still running in the background?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, resolve on the top level promise can only be called once, so that works well.

The only thing I would be concerned about is the commandToEmit array which is stateful, the array get cleared on completion.

If you are asking about a workflow that has ongoing eventuals like dangling promises or signal handlers, I don't think we allow them right now... or we are inconsistent. I... will write more tests. I think at one point we said we want workflows to execute past completion, so maybe that is wrong right now. Our tests didn't cover that case.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It did used to work and we discussed it before. I guess we didn't add a test. It would be important for signal handlers and also for things like queries if we ever add them

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated and tested, succeeded workflows will continue to accept events and emit events. Failed will not.

super(...args);
export function hookDate() {
if (!isDateHooked()) {
globalThis.Date = class extends Date {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to be able to restore it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a restoreDate method.

@thantos thantos requested a review from sam-goodwin March 9, 2023 04:29
Copy link
Owner

@sam-goodwin sam-goodwin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your best work yet. Amazing stuff. Let's go. Had some comments but I think it's ready.

if (isActivityCall(call)) {
return {
triggers: [
Trigger.workflowEvent(WorkflowEventType.ActivitySucceeded, (event) =>
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean "onWorkflowEvent"? Would that be a better name? Or just onEvent?

seq: number;
}

export function createEventualPromise<R>(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document why this is needed

packages/@eventual/core-runtime/src/workflow-executor.ts Outdated Show resolved Hide resolved
packages/@eventual/core-runtime/src/workflow-executor.ts Outdated Show resolved Hide resolved
packages/@eventual/core-runtime/src/workflow-executor.ts Outdated Show resolved Hide resolved
packages/@eventual/core-runtime/src/workflow-executor.ts Outdated Show resolved Hide resolved
commands: WorkflowCommand[];
}

export type Trigger<OwnRes> =
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does OwnRes mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Own result as opposed to the result of the trigger. Will make this more clear.

packages/@eventual/core-runtime/src/workflow-executor.ts Outdated Show resolved Hide resolved
packages/@eventual/core-runtime/src/workflow-executor.ts Outdated Show resolved Hide resolved
@thantos
Copy link
Contributor Author

thantos commented Mar 9, 2023

Your best work yet. Amazing stuff. Let's go. Had some comments but I think it's ready.

takes a bow Great feedback, updated, will merge once the tests pass.

@thantos thantos merged commit f048ae9 into main Mar 9, 2023
@thantos thantos deleted the sussman/async_workflows branch March 9, 2023 17:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants