Skip to content

Commit

Permalink
Reduces DB round trips on store_db.Get() calls
Browse files Browse the repository at this point in the history
  • Loading branch information
vyzaldysanchez committed Oct 10, 2024
1 parent 816b25c commit a1b7108
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 80 deletions.
5 changes: 3 additions & 2 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,8 @@ func generateExecutionID(workflowID, eventID string) (string, error) {

// startExecution kicks off a new workflow execution when a trigger event is received.
func (e *Engine) startExecution(ctx context.Context, executionID string, event *values.Map) error {
e.logger.With("event", event, eIDKey, executionID).Debug("executing on a trigger event")
lggr := e.logger.With("event", event, eIDKey, executionID)
lggr.Debug("executing on a trigger event")
ec := &store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
workflows.KeywordTrigger: {
Expand Down Expand Up @@ -544,7 +545,7 @@ func (e *Engine) startExecution(ctx context.Context, executionID string, event *
})
if !added {
// skip this execution since there's already a stepUpdateLoop running for the execution ID
e.logger.With(eIDKey, executionID).Debugf("won't start execution for execution %s, execution was already started", executionID)
lggr.Debugf("won't start execution for execution %s, execution was already started", executionID)
return nil
}
e.wg.Add(1)
Expand Down
164 changes: 86 additions & 78 deletions core/services/workflows/store/store_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,26 @@ type workflowStepRow struct {
UpdatedAt *time.Time `db:"updated_at"`
}

// workflowExecutionWithStep is a struct that represents a row from the join of the workflow_executions and workflow_steps tables.
type workflowExecutionWithStep struct {
// WorkflowExecutionStep fields
WSWorkflowExecutionID string `db:"ws_workflow_execution_id"`
WSRef string `db:"ws_ref"`
WSStatus string `db:"ws_status"`
WSInputs []byte `db:"ws_inputs"`
WSOutputErr *string `db:"ws_output_err"`
WSOutputValue []byte `db:"ws_output_value"`
WSUpdatedAt *time.Time `db:"ws_updated_at"`

// WorkflowExecution fields
WEID string `db:"we_id"`
WEWorkflowID *string `db:"we_workflow_id"`
WEStatus string `db:"we_status"`
WECreatedAt *time.Time `db:"we_created_at"`
WEUpdatedAt *time.Time `db:"we_updated_at"`
WEFinishedAt *time.Time `db:"we_finished_at"`
}

// `UpdateStatus` updates the status of the given workflow execution
func (d *DBStore) UpdateStatus(ctx context.Context, executionID string, status string) error {
sql := `UPDATE workflow_executions SET status = $1, updated_at = $2 WHERE id = $3`
Expand Down Expand Up @@ -78,45 +98,80 @@ func (d *DBStore) UpsertStep(ctx context.Context, stepState *WorkflowExecutionSt
return d.Get(ctx, step.WorkflowExecutionID)
}

// `Get` fetches the ExecutionState from the database.
// Get fetches the ExecutionState from the database.
func (d *DBStore) Get(ctx context.Context, executionID string) (WorkflowExecution, error) {
wex := &workflowExecutionRow{}
err := d.db.GetContext(ctx, wex, `SELECT * FROM workflow_executions WHERE id = $1`, executionID)
sql := `
SELECT
workflow_executions.id AS we_id,
workflow_executions.workflow_id AS we_workflow_id,
workflow_executions.status AS we_status,
workflow_executions.created_at AS we_created_at,
workflow_executions.updated_at AS we_updated_at,
workflow_executions.finished_at AS we_finished_at,
workflow_steps.workflow_execution_id AS ws_workflow_execution_id,
workflow_steps.ref AS ws_ref,
workflow_steps.status AS ws_status,
workflow_steps.inputs AS ws_inputs,
workflow_steps.output_err AS ws_output_err,
workflow_steps.output_value AS ws_output_value,
workflow_steps.updated_at AS ws_updated_at
FROM workflow_executions JOIN workflow_steps
ON workflow_executions.id = workflow_steps.workflow_execution_id
WHERE workflow_executions.id = $1`

var records []workflowExecutionWithStep
err := d.db.SelectContext(ctx, &records, sql, executionID)
if err != nil {
return WorkflowExecution{}, err
}

ws := []workflowStepRow{}
err = d.db.SelectContext(ctx, &ws, `SELECT * FROM workflow_steps WHERE workflow_execution_id = $1`, wex.ID)
idToExecutionState, err := workflowExecutionsWithStepToWorkflowExecutions(records)
if err != nil {
return WorkflowExecution{}, err
}
state, ok := idToExecutionState[executionID]
if !ok {
return WorkflowExecution{}, fmt.Errorf("could not find workflow execution with id %s", executionID)
}
return *state, nil
}

refToStep := map[string]*WorkflowExecutionStep{}
for _, s := range ws {
ss, err := stepToState(s)
if err != nil {
return WorkflowExecution{}, err
func workflowExecutionsWithStepToWorkflowExecutions(wews []workflowExecutionWithStep) (map[string]*WorkflowExecution, error) {
idToExecutionState := map[string]*WorkflowExecution{}
for _, jr := range wews {
var wid string
if jr.WEWorkflowID != nil {
wid = *jr.WEWorkflowID
}
if _, ok := idToExecutionState[jr.WEID]; !ok {
idToExecutionState[jr.WEID] = &WorkflowExecution{
ExecutionID: jr.WEID,
WorkflowID: wid,
Status: jr.WEStatus,
Steps: map[string]*WorkflowExecutionStep{},
CreatedAt: jr.WECreatedAt,
UpdatedAt: jr.WEUpdatedAt,
FinishedAt: jr.WEFinishedAt,
}
}

refToStep[s.Ref] = ss
}
state, err := stepToState(workflowStepRow{
WorkflowExecutionID: jr.WSWorkflowExecutionID,
Ref: jr.WSRef,
OutputErr: jr.WSOutputErr,
OutputValue: jr.WSOutputValue,
Inputs: jr.WSInputs,
Status: jr.WSStatus,
UpdatedAt: jr.WSUpdatedAt,
})
if err != nil {
return nil, err
}

var workflowID string
if wex.WorkflowID != nil {
workflowID = *wex.WorkflowID
es := idToExecutionState[jr.WEID]
es.Steps[state.Ref] = state
}

es := WorkflowExecution{
ExecutionID: wex.ID,
WorkflowID: workflowID,
Status: wex.Status,
Steps: refToStep,
CreatedAt: wex.CreatedAt,
UpdatedAt: wex.UpdatedAt,
FinishedAt: wex.FinishedAt,
}
return es, nil
return idToExecutionState, nil
}

func stepToState(step workflowStepRow) (*WorkflowExecutionStep, error) {
Expand Down Expand Up @@ -331,65 +386,18 @@ func (d *DBStore) GetUnfinished(ctx context.Context, offset, limit int) ([]Workf
LIMIT $2
OFFSET $3
`
joinRecords := []struct {
// WorkflowExecutionStep fields
WSWorkflowExecutionID string `db:"ws_workflow_execution_id"`
WSRef string `db:"ws_ref"`
WSStatus string `db:"ws_status"`
WSInputs []byte `db:"ws_inputs"`
WSOutputErr *string `db:"ws_output_err"`
WSOutputValue []byte `db:"ws_output_value"`
WSUpdatedAt *time.Time `db:"ws_updated_at"`

// WorkflowExecution fields
WEID string `db:"we_id"`
WEWorkflowID *string `db:"we_workflow_id"`
WEStatus string `db:"we_status"`
WECreatedAt *time.Time `db:"we_created_at"`
WEUpdatedAt *time.Time `db:"we_updated_at"`
WEFinishedAt *time.Time `db:"we_finished_at"`
}{}
var joinRecords []workflowExecutionWithStep
err := d.db.SelectContext(ctx, &joinRecords, sql, StatusStarted, limit, offset)
if err != nil {
return []WorkflowExecution{}, err
}

idToExecutionState := map[string]*WorkflowExecution{}
for _, jr := range joinRecords {
var wid string
if jr.WEWorkflowID != nil {
wid = *jr.WEWorkflowID
}
if _, ok := idToExecutionState[jr.WEID]; !ok {
idToExecutionState[jr.WEID] = &WorkflowExecution{
ExecutionID: jr.WEID,
WorkflowID: wid,
Status: jr.WEStatus,
Steps: map[string]*WorkflowExecutionStep{},
CreatedAt: jr.WECreatedAt,
UpdatedAt: jr.WEUpdatedAt,
FinishedAt: jr.WEFinishedAt,
}
}

state, err := stepToState(workflowStepRow{
WorkflowExecutionID: jr.WSWorkflowExecutionID,
Ref: jr.WSRef,
OutputErr: jr.WSOutputErr,
OutputValue: jr.WSOutputValue,
Inputs: jr.WSInputs,
Status: jr.WSStatus,
UpdatedAt: jr.WSUpdatedAt,
})
if err != nil {
return nil, err
}

es := idToExecutionState[jr.WEID]
es.Steps[state.Ref] = state
idToExecutionState, err := workflowExecutionsWithStepToWorkflowExecutions(joinRecords)
if err != nil {
return []WorkflowExecution{}, err
}

states := []WorkflowExecution{}
var states []WorkflowExecution
for _, s := range idToExecutionState {
states = append(states, *s)
}
Expand Down

0 comments on commit a1b7108

Please sign in to comment.