From 08c9a5c1dedd2196ab70d7bfacdec533d8af35b8 Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Sun, 8 Nov 2020 11:48:39 +0200 Subject: [PATCH] [parade] count failures in direct predecessors, pass them on tasks Count all failures in (direct) dependencies. When owning that task, return the number of its failed dependencies. --- parade/ddl.go | 11 +++++++---- parade/ddl.sql | 8 +++++--- parade/parade_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 7 deletions(-) diff --git a/parade/ddl.go b/parade/ddl.go index 0069362de8c..516d0496516 100644 --- a/parade/ddl.go +++ b/parade/ddl.go @@ -115,6 +115,7 @@ type TaskData struct { Status *string `db:"status"` StatusCode TaskStatusCodeValue `db:"status_code"` NumTries int `db:"num_tries"` + NumFailures int `db:"num_failures"` MaxTries *int `db:"max_tries"` NumSignals int `db:"num_signals"` // Internal; set (only) in tests TotalDependencies *int `db:"total_dependencies"` @@ -179,6 +180,7 @@ func (td *TaskDataIterator) Values() ([]interface{}, error) { }, nil } +// Names of columns in tasks as they appear in the database. var TaskDataColumnNames = []string{ "id", "action", "body", "status", "status_code", "num_tries", "max_tries", "num_signals", "total_dependencies", @@ -195,10 +197,11 @@ func InsertTasks(ctx context.Context, conn *pgxpool.Conn, source pgx.CopyFromSou // OwnedTaskData is a row returned from "SELECT * FROM own_tasks(...)". type OwnedTaskData struct { - ID TaskID `db:"task_id"` - Token PerformanceToken `db:"token"` - Action string `db:"action"` - Body *string + ID TaskID `db:"task_id"` + Token PerformanceToken `db:"token"` + NumSignalledFailures int `db:"num_failures"` + Action string `db:"action"` + Body *string } // OwnTasks owns for actor and returns up to maxTasks tasks for performing any of actions. diff --git a/parade/ddl.sql b/parade/ddl.sql index 3e3d37298c3..0b0e553e54f 100644 --- a/parade/ddl.sql +++ b/parade/ddl.sql @@ -17,6 +17,7 @@ CREATE TABLE IF NOT EXISTS tasks ( status_code task_status_code_value NOT NULL DEFAULT 'pending', -- internal status code, used by parade to issue tasks num_tries INTEGER NOT NULL DEFAULT 0, -- number of attempts actors have made on this task + num_failures INTEGER NOT NULL DEFAULT 0, -- number of those attempts that failed max_tries INTEGER, total_dependencies INTEGER, -- number of tasks that must signal this task @@ -48,7 +49,7 @@ $$; CREATE OR REPLACE FUNCTION own_tasks( max_tasks INTEGER, actions VARCHAR(128) ARRAY, owner_id VARCHAR(64), max_duration INTERVAL ) -RETURNS TABLE(task_id VARCHAR(64), token UUID, action VARCHAR(128), body TEXT) +RETURNS TABLE(task_id VARCHAR(64), token UUID, num_failures INTEGER, action VARCHAR(128), body TEXT) LANGUAGE sql VOLATILE AS $$ UPDATE tasks SET actor_id = owner_id, @@ -67,7 +68,7 @@ LANGUAGE sql VOLATILE AS $$ ORDER BY random() FOR UPDATE SKIP LOCKED LIMIT max_tasks) - RETURNING id, performance_token, action, body + RETURNING id, performance_token, num_failures, action, body $$; -- Extends ownership of task id by an extra max_duration, if it is still locked with performance @@ -126,7 +127,8 @@ BEGIN GET DIAGNOSTICS num_updated := ROW_COUNT; UPDATE tasks - SET num_signals = num_signals+1 + SET num_signals = num_signals+1, + num_failures = num_failures + CASE WHEN result_status_code = 'aborted'::task_status_code_value THEN 1 ELSE 0 END WHERE id = ANY(to_signal); IF channel IS NOT NULL THEN diff --git a/parade/parade_test.go b/parade/parade_test.go index df2db67bba0..d40445ef4b2 100644 --- a/parade/parade_test.go +++ b/parade/parade_test.go @@ -451,6 +451,48 @@ func TestReturnTask_DirectlyAndRetry(t *testing.T) { if len(moreTasks) != 1 || moreTasks[0].ID != parade.TaskID("123") { t.Errorf("expected to receive only task 123 but got tasks %+v", moreTasks) } + if moreTasks[0].NumSignalledFailures != 0 { + t.Errorf("expected task 123 to have no signalled failures but got task %+v", moreTasks[0]) + } +} + +func TestReturnTask_CountsFailures(t *testing.T) { + ctx := context.Background() + pp := makeParadePrefix(t) + + two := 2 + + tasks := []parade.TaskData{ + {ID: "success", Action: "succeed", ToSignalAfter: []parade.TaskID{"end"}}, + {ID: "failure", Action: "fail", ToSignalAfter: []parade.TaskID{"end"}}, + {ID: "end", Action: "done", TotalDependencies: &two}, + } + + testutil.MustDo(t, "InsertTasks", pp.InsertTasks(ctx, tasks)) + defer makeCleanup(t, ctx, pp, tasks)() + + ownedTasks, err := pp.OwnTasks("foo", 1, []string{"succeed"}, nil) + testutil.MustDo(t, "OwnTasks succeed", err) + if len(ownedTasks) != 1 { + t.Fatalf("expected single task \"succeed\" but got %+v", ownedTasks) + } + testutil.MustDo(t, "ReturnTask succeed", pp.ReturnTask(ownedTasks[0].ID, ownedTasks[0].Token, "ok", parade.TaskCompleted)) + + ownedTasks, err = pp.OwnTasks("foo", 1, []string{"fail"}, nil) + testutil.MustDo(t, "OwnTasks fail", err) + if len(ownedTasks) != 1 { + t.Fatalf("expected single task \"fail\" but got %+v", ownedTasks) + } + testutil.MustDo(t, "ReturnTask fail", pp.ReturnTask(ownedTasks[0].ID, ownedTasks[0].Token, "ok", parade.TaskAborted)) + + ownedTasks, err = pp.OwnTasks("foo", 1, []string{"done"}, nil) + testutil.MustDo(t, "OwnTasks succeed", err) + if len(ownedTasks) != 1 { + t.Fatalf("expected single task \"done\" but got %+v", ownedTasks) + } + if ownedTasks[0].NumSignalledFailures != 1 { + t.Errorf("expected 1 failure signalled on \"done\" but got %d", ownedTasks[0].NumSignalledFailures) + } } func TestReturnTask_RetryUntilFailed(t *testing.T) {