Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce DB round trips on store_db.Get() calls #14720

Merged
merged 3 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/nice-cows-yell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#updated Refactors store_db
5 changes: 3 additions & 2 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,8 @@ func generateExecutionID(workflowID, eventID string) (string, error) {

// startExecution kicks off a new workflow execution when a trigger event is received.
func (e *Engine) startExecution(ctx context.Context, executionID string, event *values.Map) error {
e.logger.With("event", event, eIDKey, executionID).Debug("executing on a trigger event")
lggr := e.logger.With("event", event, eIDKey, executionID)
lggr.Debug("executing on a trigger event")
ec := &store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
workflows.KeywordTrigger: {
Expand Down Expand Up @@ -543,7 +544,7 @@ func (e *Engine) startExecution(ctx context.Context, executionID string, event *
})
if !added {
// skip this execution since there's already a stepUpdateLoop running for the execution ID
e.logger.With(eIDKey, executionID).Debugf("won't start execution for execution %s, execution was already started", executionID)
lggr.Debugf("won't start execution for execution %s, execution was already started", executionID)
return nil
}
e.wg.Add(1)
Expand Down
164 changes: 86 additions & 78 deletions core/services/workflows/store/store_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,26 @@ type workflowStepRow struct {
UpdatedAt *time.Time `db:"updated_at"`
}

// workflowExecutionWithStep is a struct that represents a row from the join of the workflow_executions and workflow_steps tables.
type workflowExecutionWithStep struct {
// WorkflowExecutionStep fields
WSWorkflowExecutionID string `db:"ws_workflow_execution_id"`
WSRef string `db:"ws_ref"`
WSStatus string `db:"ws_status"`
WSInputs []byte `db:"ws_inputs"`
WSOutputErr *string `db:"ws_output_err"`
WSOutputValue []byte `db:"ws_output_value"`
WSUpdatedAt *time.Time `db:"ws_updated_at"`

// WorkflowExecution fields
WEID string `db:"we_id"`
WEWorkflowID *string `db:"we_workflow_id"`
WEStatus string `db:"we_status"`
WECreatedAt *time.Time `db:"we_created_at"`
WEUpdatedAt *time.Time `db:"we_updated_at"`
WEFinishedAt *time.Time `db:"we_finished_at"`
}

// `UpdateStatus` updates the status of the given workflow execution
func (d *DBStore) UpdateStatus(ctx context.Context, executionID string, status string) error {
sql := `UPDATE workflow_executions SET status = $1, updated_at = $2 WHERE id = $3`
Expand Down Expand Up @@ -78,45 +98,80 @@ func (d *DBStore) UpsertStep(ctx context.Context, stepState *WorkflowExecutionSt
return d.Get(ctx, step.WorkflowExecutionID)
}

// `Get` fetches the ExecutionState from the database.
// Get fetches the ExecutionState from the database.
func (d *DBStore) Get(ctx context.Context, executionID string) (WorkflowExecution, error) {
wex := &workflowExecutionRow{}
err := d.db.GetContext(ctx, wex, `SELECT * FROM workflow_executions WHERE id = $1`, executionID)
sql := `
SELECT
workflow_executions.id AS we_id,
workflow_executions.workflow_id AS we_workflow_id,
workflow_executions.status AS we_status,
workflow_executions.created_at AS we_created_at,
workflow_executions.updated_at AS we_updated_at,
workflow_executions.finished_at AS we_finished_at,
workflow_steps.workflow_execution_id AS ws_workflow_execution_id,
workflow_steps.ref AS ws_ref,
workflow_steps.status AS ws_status,
workflow_steps.inputs AS ws_inputs,
workflow_steps.output_err AS ws_output_err,
workflow_steps.output_value AS ws_output_value,
workflow_steps.updated_at AS ws_updated_at
FROM workflow_executions JOIN workflow_steps
ON workflow_executions.id = workflow_steps.workflow_execution_id
WHERE workflow_executions.id = $1`

var records []workflowExecutionWithStep
err := d.db.SelectContext(ctx, &records, sql, executionID)
if err != nil {
return WorkflowExecution{}, err
}

ws := []workflowStepRow{}
err = d.db.SelectContext(ctx, &ws, `SELECT * FROM workflow_steps WHERE workflow_execution_id = $1`, wex.ID)
idToExecutionState, err := workflowExecutionsWithStepToWorkflowExecutions(records)
if err != nil {
return WorkflowExecution{}, err
}
state, ok := idToExecutionState[executionID]
if !ok {
return WorkflowExecution{}, fmt.Errorf("could not find workflow execution with id %s", executionID)
}
return *state, nil
}

refToStep := map[string]*WorkflowExecutionStep{}
for _, s := range ws {
ss, err := stepToState(s)
if err != nil {
return WorkflowExecution{}, err
func workflowExecutionsWithStepToWorkflowExecutions(wews []workflowExecutionWithStep) (map[string]*WorkflowExecution, error) {
idToExecutionState := map[string]*WorkflowExecution{}
for _, jr := range wews {
var wid string
if jr.WEWorkflowID != nil {
wid = *jr.WEWorkflowID
}
if _, ok := idToExecutionState[jr.WEID]; !ok {
idToExecutionState[jr.WEID] = &WorkflowExecution{
ExecutionID: jr.WEID,
WorkflowID: wid,
Status: jr.WEStatus,
Steps: map[string]*WorkflowExecutionStep{},
CreatedAt: jr.WECreatedAt,
UpdatedAt: jr.WEUpdatedAt,
FinishedAt: jr.WEFinishedAt,
}
}

refToStep[s.Ref] = ss
}
state, err := stepToState(workflowStepRow{
WorkflowExecutionID: jr.WSWorkflowExecutionID,
Ref: jr.WSRef,
OutputErr: jr.WSOutputErr,
OutputValue: jr.WSOutputValue,
Inputs: jr.WSInputs,
Status: jr.WSStatus,
UpdatedAt: jr.WSUpdatedAt,
})
if err != nil {
return nil, err
}

var workflowID string
if wex.WorkflowID != nil {
workflowID = *wex.WorkflowID
es := idToExecutionState[jr.WEID]
es.Steps[state.Ref] = state
}

es := WorkflowExecution{
ExecutionID: wex.ID,
WorkflowID: workflowID,
Status: wex.Status,
Steps: refToStep,
CreatedAt: wex.CreatedAt,
UpdatedAt: wex.UpdatedAt,
FinishedAt: wex.FinishedAt,
}
return es, nil
return idToExecutionState, nil
}

func stepToState(step workflowStepRow) (*WorkflowExecutionStep, error) {
Expand Down Expand Up @@ -331,65 +386,18 @@ func (d *DBStore) GetUnfinished(ctx context.Context, offset, limit int) ([]Workf
LIMIT $2
OFFSET $3
`
joinRecords := []struct {
// WorkflowExecutionStep fields
WSWorkflowExecutionID string `db:"ws_workflow_execution_id"`
WSRef string `db:"ws_ref"`
WSStatus string `db:"ws_status"`
WSInputs []byte `db:"ws_inputs"`
WSOutputErr *string `db:"ws_output_err"`
WSOutputValue []byte `db:"ws_output_value"`
WSUpdatedAt *time.Time `db:"ws_updated_at"`

// WorkflowExecution fields
WEID string `db:"we_id"`
WEWorkflowID *string `db:"we_workflow_id"`
WEStatus string `db:"we_status"`
WECreatedAt *time.Time `db:"we_created_at"`
WEUpdatedAt *time.Time `db:"we_updated_at"`
WEFinishedAt *time.Time `db:"we_finished_at"`
}{}
var joinRecords []workflowExecutionWithStep
err := d.db.SelectContext(ctx, &joinRecords, sql, StatusStarted, limit, offset)
if err != nil {
return []WorkflowExecution{}, err
}

idToExecutionState := map[string]*WorkflowExecution{}
for _, jr := range joinRecords {
var wid string
if jr.WEWorkflowID != nil {
wid = *jr.WEWorkflowID
}
if _, ok := idToExecutionState[jr.WEID]; !ok {
idToExecutionState[jr.WEID] = &WorkflowExecution{
ExecutionID: jr.WEID,
WorkflowID: wid,
Status: jr.WEStatus,
Steps: map[string]*WorkflowExecutionStep{},
CreatedAt: jr.WECreatedAt,
UpdatedAt: jr.WEUpdatedAt,
FinishedAt: jr.WEFinishedAt,
}
}

state, err := stepToState(workflowStepRow{
WorkflowExecutionID: jr.WSWorkflowExecutionID,
Ref: jr.WSRef,
OutputErr: jr.WSOutputErr,
OutputValue: jr.WSOutputValue,
Inputs: jr.WSInputs,
Status: jr.WSStatus,
UpdatedAt: jr.WSUpdatedAt,
})
if err != nil {
return nil, err
}

es := idToExecutionState[jr.WEID]
es.Steps[state.Ref] = state
idToExecutionState, err := workflowExecutionsWithStepToWorkflowExecutions(joinRecords)
if err != nil {
return []WorkflowExecution{}, err
}

states := []WorkflowExecution{}
var states []WorkflowExecution
for _, s := range idToExecutionState {
states = append(states, *s)
}
Expand Down
Loading