Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
issue#967: Task will not been disabled on consecutive failures if the…
Browse files Browse the repository at this point in the history
… maximum failures value is -1

Renamed failure to max_failure according to the code review
  • Loading branch information
Andy Chan committed Jun 23, 2016
1 parent ec2d493 commit 6552130
Show file tree
Hide file tree
Showing 17 changed files with 36 additions and 83 deletions.
2 changes: 1 addition & 1 deletion cmd/snapctl/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var (
flTaskSchedDuration,
flTaskSchedNoStart,
flTaskDeadline,
flTaskFailures,
flTaskMaxFailures,
},
},
{
Expand Down
2 changes: 1 addition & 1 deletion cmd/snapctl/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ var (
Name: "deadline",
Usage: "The deadline for the task to be killed after started if the task runs too long (All tasks default to 5s)",
}
flTaskFailures = cli.IntFlag{
flTaskMaxFailures = cli.IntFlag{
Name: "max-failures",
Usage: "The number of consecutive failures before snap disable the task (Default 10 consective failures)",
Value: DefaultMaxFailures,
Expand Down
4 changes: 2 additions & 2 deletions cmd/snapctl/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func createTaskUsingWFManifest(ctx *cli.Context) error {

// Deadline for a task
dl := ctx.String("deadline")
failure := uint(ctx.Int("failure"))
maxFailures := ctx.Int("max-failures")

var sch *client.Schedule
// None of these mean it is a simple schedule
Expand Down Expand Up @@ -279,7 +279,7 @@ func createTaskUsingWFManifest(ctx *cli.Context) error {
}
}
// Create task
r := pClient.CreateTask(sch, wf, name, dl, !ctx.IsSet("no-start"), failure)
r := pClient.CreateTask(sch, wf, name, dl, !ctx.IsSet("no-start"), maxFailures)
if r.Err != nil {
errors := strings.Split(r.Err.Error(), " -- ")
fmt.Println("Error creating task:")
Expand Down
4 changes: 2 additions & 2 deletions core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ type Task interface {
DeadlineDuration() time.Duration
SetDeadlineDuration(time.Duration)
SetTaskID(id string)
SetStopOnFailure(uint)
GetStopOnFailure() uint
SetStopOnFailure(int)
GetStopOnFailure() int
Option(...TaskOption) TaskOption
WMap() *wmap.WorkflowMap
Schedule() schedule.Schedule
Expand Down
7 changes: 4 additions & 3 deletions docs/TASKS.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ The schedule describes the schedule type and interval for running the task. The
```
More on cron expressions can be found here: https://godoc.org/github.com/robfig/cron
#### Failure
#### Max-Failures
By default, snap will disable a task if there is 10 consecutive errors from any plugins within the workflow. The configuration
can be changed by specifying the number of failures value in the task header.
can be changed by specifying the number of failures value in the task header. If the max-failures value is -1, snap will
not disable a task with consecutive failure. Instead, snap will sleep for 1 second for every 10 consective failures
and retry again.
For more on tasks, visit [`SNAPCTL.md`](SNAPCTL.md).
Expand Down
2 changes: 1 addition & 1 deletion examples/tasks/ceph-file.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"type": "simple",
"interval": "1s"
},
"max-failure": 10,
"max-failures": 10,
"workflow": {
"collect": {
"metrics": {
Expand Down
2 changes: 1 addition & 1 deletion examples/tasks/distributed-mock-file.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"type": "simple",
"interval": "1s"
},
"max-failure": 10,
"max-failures": 10,
"workflow": {
"collect": {
"metrics": {
Expand Down
2 changes: 1 addition & 1 deletion examples/tasks/mock-file.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"type": "simple",
"interval": "1s"
},
"max-failure": 10,
"max-failures": 10,
"workflow": {
"collect": {
"metrics": {
Expand Down
2 changes: 1 addition & 1 deletion examples/tasks/mock-file.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
schedule:
type: "simple"
interval: "1s"
max-failure: 10
max-failures: 10
workflow:
collect:
metrics:
Expand Down
2 changes: 1 addition & 1 deletion examples/tasks/psutil-file.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
schedule:
type: "simple"
interval: "1s"
max-failure: 10
max-failures: 10
workflow:
collect:
metrics:
Expand Down
2 changes: 1 addition & 1 deletion examples/tasks/psutil-file_no-processor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
schedule:
type: "simple"
interval: "1s"
max-failure: 10
max-failures: 10
workflow:
collect:
metrics:
Expand Down
2 changes: 1 addition & 1 deletion examples/tasks/psutil-influx.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"type": "simple",
"interval": "1s"
},
"max-failure": 10,
"max-failures": 10,
"workflow": {
"collect": {
"metrics": {
Expand Down
12 changes: 6 additions & 6 deletions mgmt/rest/client/client_func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestSnapClient(t *testing.T) {
So(t1.Err.Error(), ShouldEqual, fmt.Sprintf("Task not found: ID(%s)", uuid))
})
Convey("invalid task (missing metric)", func() {
tt := c.CreateTask(sch, wf, "baron", "", true)
tt := c.CreateTask(sch, wf, "baron", "", true, rest.DefaultMaxFailures)
So(tt.Err, ShouldNotBeNil)
So(tt.Err.Error(), ShouldContainSubstring, "Metric not found: /intel/mock/foo")
})
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestSnapClient(t *testing.T) {
So(p.AvailablePlugins, ShouldBeEmpty)
})
Convey("invalid task (missing publisher)", func() {
tf := c.CreateTask(sch, wf, "baron", "", false)
tf := c.CreateTask(sch, wf, "baron", "", false, rest.DefaultMaxFailures)
So(tf.Err, ShouldNotBeNil)
So(tf.Err.Error(), ShouldContainSubstring, "Plugin not found: type(publisher) name(file)")
})
Expand Down Expand Up @@ -338,11 +338,11 @@ func TestSnapClient(t *testing.T) {
Convey("Tasks", func() {
Convey("Passing a bad task manifest", func() {
wfb := getWMFromSample("bad.json")
ttb := c.CreateTask(sch, wfb, "bad", "", true)
ttb := c.CreateTask(sch, wfb, "bad", "", true, rest.DefaultMaxFailures)
So(ttb.Err, ShouldNotBeNil)
})

tf := c.CreateTask(sch, wf, "baron", "", false)
tf := c.CreateTask(sch, wf, "baron", "", false, rest.DefaultMaxFailures)
Convey("valid task not started on creation", func() {
So(tf.Err, ShouldBeNil)
So(tf.Name, ShouldEqual, "baron")
Expand Down Expand Up @@ -385,7 +385,7 @@ func TestSnapClient(t *testing.T) {
})
})

tt := c.CreateTask(sch, wf, "baron", "", true)
tt := c.CreateTask(sch, wf, "baron", "", true, rest.DefaultMaxFailures)
Convey("valid task started on creation", func() {
So(tt.Err, ShouldBeNil)
So(tt.Name, ShouldEqual, "baron")
Expand Down Expand Up @@ -473,7 +473,7 @@ func TestSnapClient(t *testing.T) {
Convey("event stream", func() {
rest.StreamingBufferWindow = 0.01
sch := &Schedule{Type: "simple", Interval: "100ms"}
tf := c.CreateTask(sch, wf, "baron", "", false)
tf := c.CreateTask(sch, wf, "baron", "", false, rest.DefaultMaxFailures)

type ea struct {
events []string
Expand Down
4 changes: 4 additions & 0 deletions mgmt/rest/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (
"github.com/codegangsta/cli"
)

const (
DefaultMaxFailures = 10
)

var (
flAPIDisabled = cli.BoolFlag{
Name: "disable-api, d",
Expand Down
54 changes: 0 additions & 54 deletions mgmt/rest/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,66 +43,12 @@ var (
ErrTaskDisabledNotRunnable = errors.New("Task is disabled. Cannot be started")
)

<<<<<<< 0995570155aeaa57088aedc64b522204046a9c3c
=======
type configItem struct {
Key string `json:"key"`
Value interface{} `json:"value"`
}

type task struct {
ID uint64 `json:"id"`
Config map[string][]configItem `json:"config"`
Name string `json:"name"`
Deadline string `json:"deadline"`
Workflow wmap.WorkflowMap `json:"workflow"`
Schedule cschedule.Schedule `json:"schedule"`
CreationTime time.Time `json:"creation_timestamp,omitempty"`
LastRunTime time.Time `json:"last_run_timestamp,omitempty"`
HitCount uint `json:"hit_count,omitempty"`
MissCount uint `json:"miss_count,omitempty"`
FailedCount uint `json:"failed_count,omitempty"`
LastFailureMessage string `json:"last_failure_message,omitempty"`
State string `json:"task_state"`
MaxFailure uint `json:"max_failure"`
}

>>>>>>> issue#967: Customizable number of failures before snap disable the task
func (s *Server) addTask(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
task, err := core.CreateTaskFromContent(r.Body, nil, s.mt.CreateTask)
if err != nil {
respond(500, rbody.FromError(err), w)
return
}
<<<<<<< 0995570155aeaa57088aedc64b522204046a9c3c
=======

var opts []core.TaskOption
if tr.Deadline != "" {
dl, err := time.ParseDuration(tr.Deadline)
if err != nil {
respond(500, rbody.FromError(err), w)
return
}
opts = append(opts, core.TaskDeadlineDuration(dl))
}

if tr.Name != "" {
opts = append(opts, core.SetTaskName(tr.Name))
}
opts = append(opts, core.OptionStopOnFailure(tr.MaxFailures))

task, errs := s.mt.CreateTask(sch, tr.Workflow, tr.Start, opts...)
if errs != nil && len(errs.Errors()) != 0 {
var errMsg string
for _, e := range errs.Errors() {
errMsg = errMsg + e.Error() + " -- "
}
respond(500, rbody.FromError(errors.New(errMsg[:len(errMsg)-4])), w)
return
}

>>>>>>> issue#967: Customizable number of failures before snap disable the task
taskB := rbody.AddSchedulerTaskFromTask(task)
taskB.Href = taskURI(r.Host, task)
respond(201, taskB, w)
Expand Down
5 changes: 3 additions & 2 deletions mgmt/tribe/tribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ func (t *mockTask) CreationTime() *time.Time { return nil }
func (t *mockTask) DeadlineDuration() time.Duration { return 0 }
func (t *mockTask) SetDeadlineDuration(time.Duration) { return }
func (t *mockTask) SetTaskID(id string) { return }
func (t *mockTask) SetStopOnFailure(uint) { return }
func (t *mockTask) GetStopOnFailure() uint { return 0 }
func (t *mockTask) SetStopOnFailure(int) { return }
func (t *mockTask) GetStopOnFailure() int { return 0 }
func (t *mockTask) Option(...core.TaskOption) core.TaskOption { return core.TaskDeadlineDuration(0) }
func (t *mockTask) WMap() *wmap.WorkflowMap { return nil }
func (t *mockTask) Schedule() schedule.Schedule { return nil }
func (t *mockTask) MaxFailures() int { return 10 }

func getTestConfig() *Config {
cfg := GetDefaultConfig()
Expand Down
11 changes: 6 additions & 5 deletions scheduler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type task struct {
failedRuns uint
lastFailureMessage string
lastFailureTime time.Time
stopOnFailure uint
stopOnFailure int
eventEmitter gomit.Emitter
RemoteManagers managers
}
Expand Down Expand Up @@ -199,15 +199,15 @@ func (t *task) Status() WorkflowState {
return t.workflow.State()
}

func (t *task) SetStopOnFailure(v uint) {
func (t *task) SetStopOnFailure(v int) {
t.stopOnFailure = v
}

func (t *task) SetID(id string) {
t.id = id
}

func (t *task) GetStopOnFailure() uint {
func (t *task) GetStopOnFailure() int {
return t.stopOnFailure
}

Expand Down Expand Up @@ -271,7 +271,7 @@ func (t *task) Schedule() schedule.Schedule {
}

func (t *task) spin() {
var consecutiveFailures uint
var consecutiveFailures int
for {
taskLogger.Debug("task spin loop")
// Start go routine to wait on schedule
Expand Down Expand Up @@ -301,7 +301,7 @@ func (t *task) spin() {
} else {
consecutiveFailures = 0
}
if consecutiveFailures >= t.stopOnFailure {
if t.stopOnFailure >= 0 && consecutiveFailures >= t.stopOnFailure {
taskLogger.WithFields(log.Fields{
"_block": "spin",
"task-id": t.id,
Expand All @@ -320,6 +320,7 @@ func (t *task) spin() {
defer t.eventEmitter.Emit(event)
return
}

// Schedule has ended
case schedule.Ended:
// You must lock task to change state
Expand Down

0 comments on commit 6552130

Please sign in to comment.