Skip to content

Commit

Permalink
[parade] count failures in direct predecessors, pass them on tasks
Browse files Browse the repository at this point in the history
Count all failures in (direct) dependencies.  When owning that task, return the number of its
failed dependencies.
  • Loading branch information
arielshaqed committed Nov 8, 2020
1 parent f7c0dbc commit 08c9a5c
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 7 deletions.
11 changes: 7 additions & 4 deletions parade/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type TaskData struct {
Status *string `db:"status"`
StatusCode TaskStatusCodeValue `db:"status_code"`
NumTries int `db:"num_tries"`
NumFailures int `db:"num_failures"`
MaxTries *int `db:"max_tries"`
NumSignals int `db:"num_signals"` // Internal; set (only) in tests
TotalDependencies *int `db:"total_dependencies"`
Expand Down Expand Up @@ -179,6 +180,7 @@ func (td *TaskDataIterator) Values() ([]interface{}, error) {
}, nil
}

// Names of columns in tasks as they appear in the database.
var TaskDataColumnNames = []string{
"id", "action", "body", "status", "status_code", "num_tries", "max_tries",
"num_signals", "total_dependencies",
Expand All @@ -195,10 +197,11 @@ func InsertTasks(ctx context.Context, conn *pgxpool.Conn, source pgx.CopyFromSou

// OwnedTaskData is a row returned from "SELECT * FROM own_tasks(...)".
type OwnedTaskData struct {
ID TaskID `db:"task_id"`
Token PerformanceToken `db:"token"`
Action string `db:"action"`
Body *string
ID TaskID `db:"task_id"`
Token PerformanceToken `db:"token"`
NumSignalledFailures int `db:"num_failures"`
Action string `db:"action"`
Body *string
}

// OwnTasks owns for actor and returns up to maxTasks tasks for performing any of actions.
Expand Down
8 changes: 5 additions & 3 deletions parade/ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ CREATE TABLE IF NOT EXISTS tasks (
status_code task_status_code_value NOT NULL DEFAULT 'pending', -- internal status code, used by parade to issue tasks

num_tries INTEGER NOT NULL DEFAULT 0, -- number of attempts actors have made on this task
num_failures INTEGER NOT NULL DEFAULT 0, -- number of those attempts that failed
max_tries INTEGER,

total_dependencies INTEGER, -- number of tasks that must signal this task
Expand Down Expand Up @@ -48,7 +49,7 @@ $$;
CREATE OR REPLACE FUNCTION own_tasks(
max_tasks INTEGER, actions VARCHAR(128) ARRAY, owner_id VARCHAR(64), max_duration INTERVAL
)
RETURNS TABLE(task_id VARCHAR(64), token UUID, action VARCHAR(128), body TEXT)
RETURNS TABLE(task_id VARCHAR(64), token UUID, num_failures INTEGER, action VARCHAR(128), body TEXT)
LANGUAGE sql VOLATILE AS $$
UPDATE tasks
SET actor_id = owner_id,
Expand All @@ -67,7 +68,7 @@ LANGUAGE sql VOLATILE AS $$
ORDER BY random()
FOR UPDATE SKIP LOCKED
LIMIT max_tasks)
RETURNING id, performance_token, action, body
RETURNING id, performance_token, num_failures, action, body
$$;

-- Extends ownership of task id by an extra max_duration, if it is still locked with performance
Expand Down Expand Up @@ -126,7 +127,8 @@ BEGIN
GET DIAGNOSTICS num_updated := ROW_COUNT;

UPDATE tasks
SET num_signals = num_signals+1
SET num_signals = num_signals+1,
num_failures = num_failures + CASE WHEN result_status_code = 'aborted'::task_status_code_value THEN 1 ELSE 0 END
WHERE id = ANY(to_signal);

IF channel IS NOT NULL THEN
Expand Down
42 changes: 42 additions & 0 deletions parade/parade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,48 @@ func TestReturnTask_DirectlyAndRetry(t *testing.T) {
if len(moreTasks) != 1 || moreTasks[0].ID != parade.TaskID("123") {
t.Errorf("expected to receive only task 123 but got tasks %+v", moreTasks)
}
if moreTasks[0].NumSignalledFailures != 0 {
t.Errorf("expected task 123 to have no signalled failures but got task %+v", moreTasks[0])
}
}

func TestReturnTask_CountsFailures(t *testing.T) {
ctx := context.Background()
pp := makeParadePrefix(t)

two := 2

tasks := []parade.TaskData{
{ID: "success", Action: "succeed", ToSignalAfter: []parade.TaskID{"end"}},
{ID: "failure", Action: "fail", ToSignalAfter: []parade.TaskID{"end"}},
{ID: "end", Action: "done", TotalDependencies: &two},
}

testutil.MustDo(t, "InsertTasks", pp.InsertTasks(ctx, tasks))
defer makeCleanup(t, ctx, pp, tasks)()

ownedTasks, err := pp.OwnTasks("foo", 1, []string{"succeed"}, nil)
testutil.MustDo(t, "OwnTasks succeed", err)
if len(ownedTasks) != 1 {
t.Fatalf("expected single task \"succeed\" but got %+v", ownedTasks)
}
testutil.MustDo(t, "ReturnTask succeed", pp.ReturnTask(ownedTasks[0].ID, ownedTasks[0].Token, "ok", parade.TaskCompleted))

ownedTasks, err = pp.OwnTasks("foo", 1, []string{"fail"}, nil)
testutil.MustDo(t, "OwnTasks fail", err)
if len(ownedTasks) != 1 {
t.Fatalf("expected single task \"fail\" but got %+v", ownedTasks)
}
testutil.MustDo(t, "ReturnTask fail", pp.ReturnTask(ownedTasks[0].ID, ownedTasks[0].Token, "ok", parade.TaskAborted))

ownedTasks, err = pp.OwnTasks("foo", 1, []string{"done"}, nil)
testutil.MustDo(t, "OwnTasks succeed", err)
if len(ownedTasks) != 1 {
t.Fatalf("expected single task \"done\" but got %+v", ownedTasks)
}
if ownedTasks[0].NumSignalledFailures != 1 {
t.Errorf("expected 1 failure signalled on \"done\" but got %d", ownedTasks[0].NumSignalledFailures)
}
}

func TestReturnTask_RetryUntilFailed(t *testing.T) {
Expand Down

0 comments on commit 08c9a5c

Please sign in to comment.