From a1b71082d064d12506095ad59b6a4802b3f6bc67 Mon Sep 17 00:00:00 2001 From: vyzaldysanchez Date: Thu, 10 Oct 2024 11:12:06 -0400 Subject: [PATCH 1/2] Reduces DB round trips on `store_db.Get()` calls --- core/services/workflows/engine.go | 5 +- core/services/workflows/store/store_db.go | 164 ++++++++++++---------- 2 files changed, 89 insertions(+), 80 deletions(-) diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index ed584ba5aec..194c2895cc3 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -507,7 +507,8 @@ func generateExecutionID(workflowID, eventID string) (string, error) { // startExecution kicks off a new workflow execution when a trigger event is received. func (e *Engine) startExecution(ctx context.Context, executionID string, event *values.Map) error { - e.logger.With("event", event, eIDKey, executionID).Debug("executing on a trigger event") + lggr := e.logger.With("event", event, eIDKey, executionID) + lggr.Debug("executing on a trigger event") ec := &store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ workflows.KeywordTrigger: { @@ -544,7 +545,7 @@ func (e *Engine) startExecution(ctx context.Context, executionID string, event * }) if !added { // skip this execution since there's already a stepUpdateLoop running for the execution ID - e.logger.With(eIDKey, executionID).Debugf("won't start execution for execution %s, execution was already started", executionID) + lggr.Debugf("won't start execution for execution %s, execution was already started", executionID) return nil } e.wg.Add(1) diff --git a/core/services/workflows/store/store_db.go b/core/services/workflows/store/store_db.go index 929fb5f377e..a070062aee6 100644 --- a/core/services/workflows/store/store_db.go +++ b/core/services/workflows/store/store_db.go @@ -50,6 +50,26 @@ type workflowStepRow struct { UpdatedAt *time.Time `db:"updated_at"` } +// workflowExecutionWithStep is a struct that represents a row from the join of the workflow_executions and workflow_steps tables. +type workflowExecutionWithStep struct { + // WorkflowExecutionStep fields + WSWorkflowExecutionID string `db:"ws_workflow_execution_id"` + WSRef string `db:"ws_ref"` + WSStatus string `db:"ws_status"` + WSInputs []byte `db:"ws_inputs"` + WSOutputErr *string `db:"ws_output_err"` + WSOutputValue []byte `db:"ws_output_value"` + WSUpdatedAt *time.Time `db:"ws_updated_at"` + + // WorkflowExecution fields + WEID string `db:"we_id"` + WEWorkflowID *string `db:"we_workflow_id"` + WEStatus string `db:"we_status"` + WECreatedAt *time.Time `db:"we_created_at"` + WEUpdatedAt *time.Time `db:"we_updated_at"` + WEFinishedAt *time.Time `db:"we_finished_at"` +} + // `UpdateStatus` updates the status of the given workflow execution func (d *DBStore) UpdateStatus(ctx context.Context, executionID string, status string) error { sql := `UPDATE workflow_executions SET status = $1, updated_at = $2 WHERE id = $3` @@ -78,45 +98,80 @@ func (d *DBStore) UpsertStep(ctx context.Context, stepState *WorkflowExecutionSt return d.Get(ctx, step.WorkflowExecutionID) } -// `Get` fetches the ExecutionState from the database. +// Get fetches the ExecutionState from the database. func (d *DBStore) Get(ctx context.Context, executionID string) (WorkflowExecution, error) { - wex := &workflowExecutionRow{} - err := d.db.GetContext(ctx, wex, `SELECT * FROM workflow_executions WHERE id = $1`, executionID) + sql := ` + SELECT + workflow_executions.id AS we_id, + workflow_executions.workflow_id AS we_workflow_id, + workflow_executions.status AS we_status, + workflow_executions.created_at AS we_created_at, + workflow_executions.updated_at AS we_updated_at, + workflow_executions.finished_at AS we_finished_at, + workflow_steps.workflow_execution_id AS ws_workflow_execution_id, + workflow_steps.ref AS ws_ref, + workflow_steps.status AS ws_status, + workflow_steps.inputs AS ws_inputs, + workflow_steps.output_err AS ws_output_err, + workflow_steps.output_value AS ws_output_value, + workflow_steps.updated_at AS ws_updated_at + FROM workflow_executions JOIN workflow_steps + ON workflow_executions.id = workflow_steps.workflow_execution_id + WHERE workflow_executions.id = $1` + + var records []workflowExecutionWithStep + err := d.db.SelectContext(ctx, &records, sql, executionID) if err != nil { return WorkflowExecution{}, err } - - ws := []workflowStepRow{} - err = d.db.SelectContext(ctx, &ws, `SELECT * FROM workflow_steps WHERE workflow_execution_id = $1`, wex.ID) + idToExecutionState, err := workflowExecutionsWithStepToWorkflowExecutions(records) if err != nil { return WorkflowExecution{}, err } + state, ok := idToExecutionState[executionID] + if !ok { + return WorkflowExecution{}, fmt.Errorf("could not find workflow execution with id %s", executionID) + } + return *state, nil +} - refToStep := map[string]*WorkflowExecutionStep{} - for _, s := range ws { - ss, err := stepToState(s) - if err != nil { - return WorkflowExecution{}, err +func workflowExecutionsWithStepToWorkflowExecutions(wews []workflowExecutionWithStep) (map[string]*WorkflowExecution, error) { + idToExecutionState := map[string]*WorkflowExecution{} + for _, jr := range wews { + var wid string + if jr.WEWorkflowID != nil { + wid = *jr.WEWorkflowID + } + if _, ok := idToExecutionState[jr.WEID]; !ok { + idToExecutionState[jr.WEID] = &WorkflowExecution{ + ExecutionID: jr.WEID, + WorkflowID: wid, + Status: jr.WEStatus, + Steps: map[string]*WorkflowExecutionStep{}, + CreatedAt: jr.WECreatedAt, + UpdatedAt: jr.WEUpdatedAt, + FinishedAt: jr.WEFinishedAt, + } } - refToStep[s.Ref] = ss - } + state, err := stepToState(workflowStepRow{ + WorkflowExecutionID: jr.WSWorkflowExecutionID, + Ref: jr.WSRef, + OutputErr: jr.WSOutputErr, + OutputValue: jr.WSOutputValue, + Inputs: jr.WSInputs, + Status: jr.WSStatus, + UpdatedAt: jr.WSUpdatedAt, + }) + if err != nil { + return nil, err + } - var workflowID string - if wex.WorkflowID != nil { - workflowID = *wex.WorkflowID + es := idToExecutionState[jr.WEID] + es.Steps[state.Ref] = state } - es := WorkflowExecution{ - ExecutionID: wex.ID, - WorkflowID: workflowID, - Status: wex.Status, - Steps: refToStep, - CreatedAt: wex.CreatedAt, - UpdatedAt: wex.UpdatedAt, - FinishedAt: wex.FinishedAt, - } - return es, nil + return idToExecutionState, nil } func stepToState(step workflowStepRow) (*WorkflowExecutionStep, error) { @@ -331,65 +386,18 @@ func (d *DBStore) GetUnfinished(ctx context.Context, offset, limit int) ([]Workf LIMIT $2 OFFSET $3 ` - joinRecords := []struct { - // WorkflowExecutionStep fields - WSWorkflowExecutionID string `db:"ws_workflow_execution_id"` - WSRef string `db:"ws_ref"` - WSStatus string `db:"ws_status"` - WSInputs []byte `db:"ws_inputs"` - WSOutputErr *string `db:"ws_output_err"` - WSOutputValue []byte `db:"ws_output_value"` - WSUpdatedAt *time.Time `db:"ws_updated_at"` - - // WorkflowExecution fields - WEID string `db:"we_id"` - WEWorkflowID *string `db:"we_workflow_id"` - WEStatus string `db:"we_status"` - WECreatedAt *time.Time `db:"we_created_at"` - WEUpdatedAt *time.Time `db:"we_updated_at"` - WEFinishedAt *time.Time `db:"we_finished_at"` - }{} + var joinRecords []workflowExecutionWithStep err := d.db.SelectContext(ctx, &joinRecords, sql, StatusStarted, limit, offset) if err != nil { return []WorkflowExecution{}, err } - idToExecutionState := map[string]*WorkflowExecution{} - for _, jr := range joinRecords { - var wid string - if jr.WEWorkflowID != nil { - wid = *jr.WEWorkflowID - } - if _, ok := idToExecutionState[jr.WEID]; !ok { - idToExecutionState[jr.WEID] = &WorkflowExecution{ - ExecutionID: jr.WEID, - WorkflowID: wid, - Status: jr.WEStatus, - Steps: map[string]*WorkflowExecutionStep{}, - CreatedAt: jr.WECreatedAt, - UpdatedAt: jr.WEUpdatedAt, - FinishedAt: jr.WEFinishedAt, - } - } - - state, err := stepToState(workflowStepRow{ - WorkflowExecutionID: jr.WSWorkflowExecutionID, - Ref: jr.WSRef, - OutputErr: jr.WSOutputErr, - OutputValue: jr.WSOutputValue, - Inputs: jr.WSInputs, - Status: jr.WSStatus, - UpdatedAt: jr.WSUpdatedAt, - }) - if err != nil { - return nil, err - } - - es := idToExecutionState[jr.WEID] - es.Steps[state.Ref] = state + idToExecutionState, err := workflowExecutionsWithStepToWorkflowExecutions(joinRecords) + if err != nil { + return []WorkflowExecution{}, err } - states := []WorkflowExecution{} + var states []WorkflowExecution for _, s := range idToExecutionState { states = append(states, *s) } From 610cac2ba1d53b7c6d4370da2ca3687f6c21bfca Mon Sep 17 00:00:00 2001 From: vyzaldysanchez Date: Thu, 10 Oct 2024 11:17:35 -0400 Subject: [PATCH 2/2] Adds changeset --- .changeset/nice-cows-yell.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/nice-cows-yell.md diff --git a/.changeset/nice-cows-yell.md b/.changeset/nice-cows-yell.md new file mode 100644 index 00000000000..1e7f045b476 --- /dev/null +++ b/.changeset/nice-cows-yell.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#updated Refactors store_db