diff --git a/cmd/snapctl/flags.go b/cmd/snapctl/flags.go index 3f1c9b33b..03970673a 100644 --- a/cmd/snapctl/flags.go +++ b/cmd/snapctl/flags.go @@ -21,10 +21,6 @@ package main import "github.com/codegangsta/cli" -const ( - DefaultMaxFailures = 10 -) - var ( // Main flags @@ -126,10 +122,9 @@ var ( Name: "deadline", Usage: "The deadline for the task to be killed after started if the task runs too long (All tasks default to 5s)", } - flTaskMaxFailures = cli.IntFlag{ + flTaskMaxFailures = cli.StringFlag{ Name: "max-failures", - Usage: "The number of consecutive failures before snap disable the task (Default 10 consective failures)", - Value: DefaultMaxFailures, + Usage: "The number of consecutive failures before snap disables the task", } // metric diff --git a/cmd/snapctl/task.go b/cmd/snapctl/task.go index b0818fe96..2bfb27b33 100644 --- a/cmd/snapctl/task.go +++ b/cmd/snapctl/task.go @@ -23,6 +23,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "log" "os" "os/signal" "path/filepath" @@ -100,7 +101,204 @@ func createTask(ctx *cli.Context) error { return err } +func stringValToInt(val string) int { + // parse the input (string) value as an integer (log a Fatal + // error if we cannot parse the value as an integer) + parsedField, err := strconv.Atoi(val) + if err != nil { + splitErr := strings.Split(err.Error(), ": ") + errStr := splitErr[len(splitErr)-1] + log.Fatal(fmt.Sprintf("Value '%v' cannot be parsed as an integer (%v)", val, errStr)) + } + return int(parsedField) +} + +// Parses the command-line parameters and uses them to override the underlying schedule +// in this task (if any) or set a schedule for that task (if one is not already defined) +func (t *task) setWindowedSchedule(start *time.Time, stop *time.Time, duration *time.Duration) error { + // ir this task does not have an existing schedule, create an empty + // windowed schedule + if t.Schedule == nil { + t.Schedule = &client.Schedule{ + Type: "windowed", + } + } else if t.Schedule.Type == "" { + // else if there is an empty schedule already defined for this task, + // set the type for that schedule to 'windowed' + t.Schedule.Type = "windowed" + } else if t.Schedule.Type != "windowed" { + // else if the task's existing schedule is not a 'windowed' schedule, + // then return an error + return fmt.Errorf("Usage error (schedule type mismatch); cannot replace existing schedule of type '%v' with a new, 'windowed' schedule", t.Schedule.Type) + } + // grab the task schedule's start/stop time (will use these later to fill in a + // missing boundary value, if we find one missing in our input arguments) + scheduleStart := t.Schedule.StartTime + scheduleStop := t.Schedule.StopTime + // if a duration was passed in, determine the start and stop times for our new + // 'windowed' schedule from the input parameters + if duration != nil { + // if start and stop were both defined, then return an error (since all three parameters cannot be used + // to define the 'windowed' schedule) + if start != nil && stop != nil { + return fmt.Errorf("Usage error (too many parameters); only two of the parameters that define the window (start time, stop time and duration) can be specified for a 'windowed' schedule") + } + // if start is set and stop is not then use duration to create stop + if start != nil && stop == nil { + newStop := start.Add(*duration) + t.Schedule.StartTime = start + t.Schedule.StopTime = &newStop + } else if stop != nil && start == nil { + // else if stop is set and start is not then use duration to create start + newStart := stop.Add(*duration * -1) + t.Schedule.StartTime = &newStart + t.Schedule.StopTime = stop + } else { + // else, the start and stop are both undefined but a duration was passed in, + // so use the current date/time (plus the 'createTaskNowPad' value) as the + // start date/time and construct a stop date/time from that start date/time + // and the duration + newStart := time.Now().Add(createTaskNowPad) + newStop := newStart.Add(*duration) + t.Schedule.StartTime = &newStart + t.Schedule.StopTime = &newStop + } + // since we've filled in the parameters for the new 'windowed' schedule successfully, + // return nil (no error) + return nil + } else if (start == nil && scheduleStart == nil) || (stop == nil && scheduleStop == nil) { + // else if the duration is undefined and either the start or stop date/time is missing (and the + // corresponding start/stop date/time the schedule for this task is also undefined) then throw + // an error (since we can't construct a complete window) + return fmt.Errorf("Usage error (missing boundary); both boundaries (the start and stop date-time) must be specified for a 'windowed' schedule when no duration is specified") + } + // if new start and stop date/times were both specified, then use them to define a new 'windowed' + // schedule (provided the start is less than the stop) + if start != nil && stop != nil { + if start.After(*stop) { + return fmt.Errorf("Usage error (start after stop); the start date-time (%v) must be before the stop date-time (%v)", start, stop) + } + t.Schedule.StartTime = start + t.Schedule.StopTime = stop + } else if start != nil { + // otherwise, if only the start date/time was specified, use it to replace the current schedule's + // start date/time (provided it is before the current schedule's stop date/time) + if !start.Before(*(t.Schedule.StopTime)) { + return fmt.Errorf("Usage error (start after existing stop); the new start date-time (%v) must be before the existing stop date-time (%v)", start, t.Schedule.StopTime) + } + t.Schedule.StartTime = start + } else if stop != nil { + // otherwise, if only the stop date/time was specified, use it to replace the current schedule's + // stop date/time (provided it is after the current schedule's start date/time) + if !stop.After(*(t.Schedule.StartTime)) { + return fmt.Errorf("Usage error (stop before existing start); the new stop date-time (%v) must be after the existing start date-time (%v)", stop, t.Schedule.StartTime) + } + t.Schedule.StopTime = stop + } + return nil +} + +// parse the command-line options and use them to setup a new schedule for this task +func (t *task) setScheduleFromCliOptions(ctx *cli.Context) error { + // if there is no schedule associated with this task, create an empty one + if t.Schedule == nil { + t.Schedule = &client.Schedule{} + } + // check the start, stop, and duration values to see if we're looking at a windowed schedule (or not) + // first, get the parameters that define the windowed schedule + start := mergeDateTime( + strings.ToUpper(ctx.String("start-time")), + strings.ToUpper(ctx.String("start-date")), + ) + stop := mergeDateTime( + strings.ToUpper(ctx.String("stop-time")), + strings.ToUpper(ctx.String("stop-date")), + ) + // Use duration to create missing start or stop + durationStr := ctx.String("duration") + // and, in either case, we'll use the input interval (if one was provided) + // to update the interval of our schedule + interval := ctx.String("interval") + if !ctx.IsSet("interval") && interval == "" && t.Schedule.Interval == "" { + return fmt.Errorf("Usage error (missing interval value); when constructing a new task schedule, an interval must be provided") + } + // parse the duration (if one was passed in) + var duration *time.Duration + if ctx.IsSet("duration") || durationStr != "" { + d, err := time.ParseDuration(durationStr) + if err != nil { + return fmt.Errorf("Bad duration format; %v", err) + } + duration = &d + } + fmt.Printf("start => '%v'; stop => '%v'; duration => '%v'\n", start, stop, duration) + // ic a start, stop, or duration value was entered, then the CLI options are being used to + // specify a new windowed schedule for this task, so use them to replace the windowed schedule + // in the current task (if one exists); note that it is an error to try to replace an existing + // sechedule with a new schedule of a different type, so throw an error if that is the case + if start != nil || stop != nil || duration != nil { + t.setWindowedSchedule(start, stop, duration) + } else { + // otherwise, we're looking at either a simple schedule or a cron schedule + // Get the interval + isCron := false + _, err := time.ParseDuration(interval) + if err != nil { + // try interpreting interval as cron entry + _, e := cron.Parse(interval) + if e != nil { + return fmt.Errorf("Bad interval format: cannot parse interval value '%v' either as a duration or a cron entry\n", interval) + } + isCron = true + } + // No start or stop and no duration == either a simple schedule or a cron schedule + schedType := "simple" + if isCron { + // make sure the current schedule type (if there is one) matches + if t.Schedule.Type != "" && t.Schedule.Type != "cron" { + return fmt.Errorf("Usage error; cannot replace existing schedule of type '%v' with a new, 'cron' schedule", t.Schedule.Type) + } + schedType = "cron" + } else if t.Schedule.Type != "" && t.Schedule.Type != "simple" { + // make sure the current schedule type (if there is one) matches + return fmt.Errorf("Usage error; cannot replace existing schedule of type '%v' with a new, 'simple' schedule", t.Schedule.Type) + } + t.Schedule.Type = schedType + t.Schedule.Interval = interval + } + // since we got this far, return a nil (indicating success + return nil +} + +// merge the command-line options into the current task +func (t *task) mergeCliOptions(ctx *cli.Context) error { + // set the name of the task (if a 'name' was provided in the CLI options) + name := ctx.String("name") + if ctx.IsSet("name") || name != "" { + t.Name = name + } + // set the deadline of the task (if a 'deadline' was provided in the CLI options) + deadline := ctx.String("deadline") + if ctx.IsSet("deadline") || deadline != "" { + t.Deadline = deadline + } + // set the MaxFailures for the task (if a 'max-failures' value was provided in the CLI options) + maxFailuresStrVal := ctx.String("max-failures") + if ctx.IsSet("max-failures") || maxFailuresStrVal != "" { + t.MaxFailures = stringValToInt(maxFailuresStrVal) + } + // shouldn't ever happen, but... + if t.Version != 1 { + return fmt.Errorf("Invalid version provided") + } + // set the schedule for the task from the CLI options (and return the results + // of that method call, indicating whether or not an error was encountered while + // setting up that schedule) + return t.setScheduleFromCliOptions(ctx) +} + func createTaskUsingTaskManifest(ctx *cli.Context) error { + // get the task manifest file to use path := ctx.String("task-manifest") ext := filepath.Ext(path) file, e := ioutil.ReadFile(path) @@ -108,6 +306,7 @@ func createTaskUsingTaskManifest(ctx *cli.Context) error { return fmt.Errorf("File error [%s]- %v\n", ext, e) } + // create an empty task struct and unmarshal the contents of the file into that object t := task{} switch ext { case ".yaml", ".yml": @@ -124,17 +323,14 @@ func createTaskUsingTaskManifest(ctx *cli.Context) error { return fmt.Errorf("Unsupported file type %s\n", ext) } - t.Name = ctx.String("name") - if t.Version != 1 { - return fmt.Errorf("Invalid version provided") - } - - // If the number of failures does not specific, default value is 10 - if t.MaxFailures == 0 { - fmt.Println("If the number of maximum failures is not specified, use default value of", DefaultMaxFailures) - t.MaxFailures = DefaultMaxFailures + // merge any CLI optiones specified by the user (if any) into the current task + fmt.Printf("while constructing task from task manifest (before merge); task schedule => %+v\n", t.Schedule) + if err := t.mergeCliOptions(ctx); err != nil { + return err } + fmt.Printf("while constructing task from task manifest (after merge); task schedule => %+v\n", t.Schedule) + // and use the resulting struct to create a new task r := pClient.CreateTask(t.Schedule, t.Workflow, t.Name, t.Deadline, !ctx.IsSet("no-start"), t.MaxFailures) if r.Err != nil { @@ -154,18 +350,21 @@ func createTaskUsingTaskManifest(ctx *cli.Context) error { } func createTaskUsingWFManifest(ctx *cli.Context) error { - // Get the workflow + // Get the workflow manifest filename from the command-line path := ctx.String("workflow-manifest") ext := filepath.Ext(path) file, e := ioutil.ReadFile(path) - if !ctx.IsSet("interval") && !ctx.IsSet("i") { - return fmt.Errorf("Workflow manifest requires interval to be set via flag.") + // check to make sure that an interval was specified using the appropriate command-line flag + interval := ctx.String("interval") + if !ctx.IsSet("interval") && interval != "" { + return fmt.Errorf("Workflow manifest requires that an interval be set via a command-line flag.") } if e != nil { return fmt.Errorf("File error [%s]- %v\n", ext, e) } + // and unmarshal the contents of the workflow manifest file into a local workflow map var wf *wmap.WorkflowMap switch ext { case ".yaml", ".yml": @@ -181,91 +380,19 @@ func createTaskUsingWFManifest(ctx *cli.Context) error { return fmt.Errorf("Error parsing JSON file input - %v\n", e) } } - // Get the task name - name := ctx.String("name") - // Get the interval - isCron := false - i := ctx.String("interval") - _, err := time.ParseDuration(i) - if err != nil { - // try interpreting interval as cron entry - _, e := cron.Parse(i) - if e != nil { - return fmt.Errorf("Bad interval format:\nfor simple schedule: %v\nfor cron schedule: %v\n", err, e) - } - isCron = true - } - - // Deadline for a task - dl := ctx.String("deadline") - maxFailures := ctx.Int("max-failures") - var sch *client.Schedule - // None of these mean it is a simple schedule - if !ctx.IsSet("start-date") && !ctx.IsSet("start-time") && !ctx.IsSet("stop-date") && !ctx.IsSet("stop-time") { - // Check if duration was set - if ctx.IsSet("duration") && !isCron { - d, err := time.ParseDuration(ctx.String("duration")) - if err != nil { - return fmt.Errorf("Bad duration format:\n%v\n", err) - } - start := time.Now().Add(createTaskNowPad) - stop := start.Add(d) - sch = &client.Schedule{ - Type: "windowed", - Interval: i, - StartTime: &start, - StopTime: &stop, - } - } else { - // No start or stop and no duration == simple schedule - t := "simple" - if isCron { - // It's a cron schedule, ignore "duration" if set - t = "cron" - } - sch = &client.Schedule{ - Type: t, - Interval: i, - } - } - } else { - // We have some form of windowed schedule - start := mergeDateTime( - strings.ToUpper(ctx.String("start-time")), - strings.ToUpper(ctx.String("start-date")), - ) - stop := mergeDateTime( - strings.ToUpper(ctx.String("stop-time")), - strings.ToUpper(ctx.String("stop-date")), - ) + // create a dummy task + t := task{} - // Use duration to create missing start or stop - if ctx.IsSet("duration") { - d, err := time.ParseDuration(ctx.String("duration")) - if err != nil { - return fmt.Errorf("Bad duration format:\n%v\n", err) - } - // if start is set and stop is not then use duration to create stop - if start != nil && stop == nil { - t := start.Add(d) - stop = &t - } - // if stop is set and start is not then use duration to create start - if stop != nil && start == nil { - t := stop.Add(d * -1) - start = &t - } - } - sch = &client.Schedule{ - Type: "windowed", - Interval: i, - StartTime: start, - StopTime: stop, - } + // fill in the details for that task from the command-line arbuments passed in by the user + fmt.Printf("while constructing task from workflow manifest (before merge); task schedule => %+v\n", t.Schedule) + if err := t.mergeCliOptions(ctx); err != nil { + return err } - // Create task - r := pClient.CreateTask(sch, wf, name, dl, !ctx.IsSet("no-start"), maxFailures) + fmt.Printf("while constructing task from workflow manifest (after merge); task schedule => %+v\n", t.Schedule) + + // and use the resulting struct (along with the workflow map we constructed, above) to create a new task + r := pClient.CreateTask(t.Schedule, wf, t.Name, t.Deadline, !ctx.IsSet("no-start"), t.MaxFailures) if r.Err != nil { errors := strings.Split(r.Err.Error(), " -- ") errString := "Error creating task:" diff --git a/core/task.go b/core/task.go index b5a1d212b..bdc7d9dbb 100644 --- a/core/task.go +++ b/core/task.go @@ -197,7 +197,12 @@ func CreateTaskFromContent(body io.ReadCloser, if tr.Name != "" { opts = append(opts, SetTaskName(tr.Name)) } - opts = append(opts, OptionStopOnFailure(10)) + + // if a MaxFailures value is included as part of the task creation request + if tr.MaxFailures > 0 { + // then set the appropriate value in the opts + opts = append(opts, OptionStopOnFailure(tr.MaxFailures)) + } if mode == nil { mode = &tr.Start diff --git a/mgmt/rest/client/client_func_test.go b/mgmt/rest/client/client_func_test.go index d0c725d37..1c6920dca 100644 --- a/mgmt/rest/client/client_func_test.go +++ b/mgmt/rest/client/client_func_test.go @@ -160,7 +160,7 @@ func TestSnapClient(t *testing.T) { So(t1.Err.Error(), ShouldEqual, fmt.Sprintf("Task not found: ID(%s)", uuid)) }) Convey("invalid task (missing metric)", func() { - tt := c.CreateTask(sch, wf, "baron", "", true, rest.DefaultMaxFailures) + tt := c.CreateTask(sch, wf, "baron", "", true, 0) So(tt.Err, ShouldNotBeNil) So(tt.Err.Error(), ShouldContainSubstring, "Metric not found: /intel/mock/foo") }) @@ -193,7 +193,7 @@ func TestSnapClient(t *testing.T) { So(p.AvailablePlugins, ShouldBeEmpty) }) Convey("invalid task (missing publisher)", func() { - tf := c.CreateTask(sch, wf, "baron", "", false, rest.DefaultMaxFailures) + tf := c.CreateTask(sch, wf, "baron", "", false, 0) So(tf.Err, ShouldNotBeNil) So(tf.Err.Error(), ShouldContainSubstring, "Plugin not found: type(publisher) name(mock-file)") }) @@ -338,11 +338,11 @@ func TestSnapClient(t *testing.T) { Convey("Tasks", func() { Convey("Passing a bad task manifest", func() { wfb := getWMFromSample("bad.json") - ttb := c.CreateTask(sch, wfb, "bad", "", true, rest.DefaultMaxFailures) + ttb := c.CreateTask(sch, wfb, "bad", "", true, 0) So(ttb.Err, ShouldNotBeNil) }) - tf := c.CreateTask(sch, wf, "baron", "", false, rest.DefaultMaxFailures) + tf := c.CreateTask(sch, wf, "baron", "", false, 0) Convey("valid task not started on creation", func() { So(tf.Err, ShouldBeNil) So(tf.Name, ShouldEqual, "baron") @@ -385,7 +385,7 @@ func TestSnapClient(t *testing.T) { }) }) - tt := c.CreateTask(sch, wf, "baron", "", true, rest.DefaultMaxFailures) + tt := c.CreateTask(sch, wf, "baron", "", true, 0) Convey("valid task started on creation", func() { So(tt.Err, ShouldBeNil) So(tt.Name, ShouldEqual, "baron") @@ -473,7 +473,7 @@ func TestSnapClient(t *testing.T) { Convey("event stream", func() { rest.StreamingBufferWindow = 0.01 sch := &Schedule{Type: "simple", Interval: "100ms"} - tf := c.CreateTask(sch, wf, "baron", "", false, rest.DefaultMaxFailures) + tf := c.CreateTask(sch, wf, "baron", "", false, 0) type ea struct { events []string diff --git a/mgmt/rest/flags.go b/mgmt/rest/flags.go index 9b0b7f730..b6ef2dd0e 100644 --- a/mgmt/rest/flags.go +++ b/mgmt/rest/flags.go @@ -25,10 +25,6 @@ import ( "github.com/codegangsta/cli" ) -const ( - DefaultMaxFailures = 10 -) - var ( flAPIDisabled = cli.BoolFlag{ Name: "disable-api, d", diff --git a/scheduler/task.go b/scheduler/task.go index dbe8a33af..28b672395 100644 --- a/scheduler/task.go +++ b/scheduler/task.go @@ -41,8 +41,8 @@ import ( const ( // DefaultDeadlineDuration - The default timeout is 5 second DefaultDeadlineDuration = time.Second * 5 - // DefaultStopOnFailure - The default stopping a failure is after three tries - DefaultStopOnFailure = 3 + // DefaultStopOnFailure is used to set the number of failures before a task is disabled + DefaultStopOnFailure = 10 ) var (