From b3e990f880fcc1d962509cb08c6c659a4d9d16f1 Mon Sep 17 00:00:00 2001 From: Izabella Raulin Date: Tue, 7 Mar 2017 09:32:38 +0100 Subject: [PATCH] Run task `count` times (available for simple and window schedule) Removed the code of simple schedule --- cmd/snaptel/commands.go | 1 + cmd/snaptel/flags.go | 6 +- cmd/snaptel/task.go | 28 +- control/available_plugin.go | 2 +- core/schedule.go | 33 +- core/schedule_small_test.go | 64 +- docs/SNAPTEL.md | 9 +- docs/TASKS.md | 171 +++-- examples/tasks/psutil-file.yaml | 1 + mgmt/rest/client/client_func_test.go | 39 +- mgmt/rest/client/task.go | 5 +- mgmt/rest/rest_test.go | 2 +- mgmt/rest/rest_v1_test.go | 70 ++- mgmt/rest/rest_v2_test.go | 15 +- mgmt/rest/v1/fixtures/mock_task_manager.go | 9 +- mgmt/rest/v1/rbody/task.go | 6 - mgmt/rest/v2/mock/mock_task_manager.go | 7 +- mgmt/rest/v2/task.go | 6 - mgmt/tribe/worker/worker.go | 45 +- pkg/schedule/schedule.go | 21 +- pkg/schedule/simple_schedule.go | 65 -- pkg/schedule/simple_schedule_test.go | 113 ---- pkg/schedule/streaming_schedule.go | 21 +- pkg/schedule/windowed_schedule.go | 126 +++- pkg/schedule/windowed_schedule_medium_test.go | 594 ++++++++++++++++++ pkg/schedule/windowed_schedule_small_test.go | 93 +++ pkg/schedule/windowed_schedule_test.go | 303 --------- pkg/stringutils/string.go | 19 + scheduler/distributed_task_test.go | 163 +++-- scheduler/scheduler.go | 82 +-- scheduler/scheduler_medium_test.go | 116 ++-- scheduler/scheduler_test.go | 27 +- scheduler/task.go | 63 +- scheduler/task_test.go | 21 +- scheduler/workflow_test.go | 10 +- 35 files changed, 1514 insertions(+), 842 deletions(-) delete mode 100644 pkg/schedule/simple_schedule.go delete mode 100644 pkg/schedule/simple_schedule_test.go create mode 100644 pkg/schedule/windowed_schedule_medium_test.go create mode 100644 pkg/schedule/windowed_schedule_small_test.go delete mode 100644 pkg/schedule/windowed_schedule_test.go diff --git a/cmd/snaptel/commands.go b/cmd/snaptel/commands.go index 4dc66d02b..87b5d8b1f 100644 --- a/cmd/snaptel/commands.go +++ b/cmd/snaptel/commands.go @@ -41,6 +41,7 @@ var ( flTaskManifest, flWorkfowManifest, flTaskSchedInterval, + flTaskSchedCount, flTaskSchedStartDate, flTaskSchedStartTime, flTaskSchedStopDate, diff --git a/cmd/snaptel/flags.go b/cmd/snaptel/flags.go index 5398aa613..e26ccbc77 100644 --- a/cmd/snaptel/flags.go +++ b/cmd/snaptel/flags.go @@ -104,7 +104,6 @@ var ( Name: "interval, i", Usage: "Interval for the task schedule [ex (simple schedule): 250ms, 1s, 30m (cron schedule): \"0 * * * * *\"]", } - flTaskSchedStartTime = cli.StringFlag{ Name: "start-time", Usage: "Start time for the task schedule [defaults to now]", @@ -113,7 +112,6 @@ var ( Name: "stop-time", Usage: "Start time for the task schedule [defaults to now]", } - flTaskSchedStartDate = cli.StringFlag{ Name: "start-date", Usage: "Start date for the task schedule [defaults to today]", @@ -122,6 +120,10 @@ var ( Name: "stop-date", Usage: "Stop date for the task schedule [defaults to today]", } + flTaskSchedCount = cli.StringFlag{ + Name: "count", + Usage: "The count of runs for the task schedule [defaults to 0 what means no limit, e.g. set to 1 determines a single run task]", + } flTaskSchedDuration = cli.StringFlag{ Name: "duration, d", Usage: "The amount of time to run the task [appends to start or creates a start time before a stop]", diff --git a/cmd/snaptel/task.go b/cmd/snaptel/task.go index 545a4f6ea..73a8c01b2 100644 --- a/cmd/snaptel/task.go +++ b/cmd/snaptel/task.go @@ -101,9 +101,9 @@ func createTask(ctx *cli.Context) error { return err } +// stringValToInt parses the input (string) as an integer value (and returns that integer value +// to the caller or an error if the input value cannot be parsed as an integer) func stringValToInt(val string) (int, error) { - // parse the input (string) as an integer value (and return that integer value - // to the caller or an error if the input value cannot be parsed as an integer) parsedField, err := strconv.Atoi(val) if err != nil { splitErr := strings.Split(err.Error(), ": ") @@ -115,6 +115,20 @@ func stringValToInt(val string) (int, error) { return parsedField, nil } +// stringValToUint parses the input (string) as an unsigned integer value (and returns that uint value +// to the caller or an error if the input value cannot be parsed as an unsigned integer) +func stringValToUint(val string) (uint, error) { + parsedField, err := strconv.ParseUint(val, 10, 64) + if err != nil { + splitErr := strings.Split(err.Error(), ": ") + errStr := splitErr[len(splitErr)-1] + // return a value of zero and the error encountered during string parsing + return 0, fmt.Errorf("Value '%v' cannot be parsed as an unsigned integer (%v)", val, errStr) + } + // return the unsigned integer equivalent of the input string and a nil error (indicating success) + return uint(parsedField), nil +} + // Parses the command-line parameters (if any) and uses them to override the underlying // schedule for this task or to set a schedule for that task (if one is not already defined, // as is the case when we're building a new task from a workflow manifest). @@ -234,6 +248,16 @@ func (t *task) setScheduleFromCliOptions(ctx *cli.Context) error { if !ctx.IsSet("interval") && interval == "" && t.Schedule.Interval == "" { return fmt.Errorf("Usage error (missing interval value); when constructing a new task schedule an interval must be provided") } + + countValStr := ctx.String("count") + if ctx.IsSet("count") || countValStr != "" { + count, err := stringValToUint(countValStr) + if err != nil { + return fmt.Errorf("Usage error (bad count format); %v", err) + } + t.Schedule.Count = count + + } // if a start, stop, or duration value was provided, or if the existing schedule for this task // is 'windowed', then it's a 'windowed' schedule isWindowed := (start != nil || stop != nil || duration != nil || t.Schedule.Type == "windowed") diff --git a/control/available_plugin.go b/control/available_plugin.go index 310d8d059..af455d655 100644 --- a/control/available_plugin.go +++ b/control/available_plugin.go @@ -252,7 +252,7 @@ func (a *availablePlugin) Kill(r string) error { }).Debug("deleting available plugin package") os.RemoveAll(filepath.Dir(a.execPath)) } - // If it's a stremaing plugin, we need to signal the scheduler that + // If it's a streaming plugin, we need to signal the scheduler that // this plugin is being killed. if c, ok := a.client.(client.PluginStreamCollectorClient); ok { c.Killed() diff --git a/core/schedule.go b/core/schedule.go index dd3c3fac7..3ce276d26 100644 --- a/core/schedule.go +++ b/core/schedule.go @@ -32,32 +32,18 @@ type Schedule struct { Interval string `json:"interval,omitempty"` StartTimestamp *time.Time `json:"start_timestamp,omitempty"` StopTimestamp *time.Time `json:"stop_timestamp,omitempty"` + Count uint `json:"count,omitempty"` } +var ( + ErrMissingScheduleInterval = errors.New("missing `interval` in configuration of schedule") +) + func makeSchedule(s Schedule) (schedule.Schedule, error) { switch s.Type { - case "simple": + case "simple", "windowed": if s.Interval == "" { - return nil, errors.New("missing `interval` in configuration of simple schedule") - } - - d, err := time.ParseDuration(s.Interval) - if err != nil { - return nil, err - } - sch := schedule.NewSimpleSchedule(d) - - err = sch.Validate() - if err != nil { - return nil, err - } - return sch, nil - case "windowed": - if s.StartTimestamp == nil || s.StopTimestamp == nil || s.Interval == "" { - errmsg := fmt.Sprintf("missing parameter/parameters in configuration of windowed schedule,"+ - "start_timestamp: %s, stop_timestamp: %s, interval: %s", - s.StartTimestamp, s.StopTimestamp, s.Interval) - return nil, errors.New(errmsg) + return nil, ErrMissingScheduleInterval } d, err := time.ParseDuration(s.Interval) @@ -69,6 +55,7 @@ func makeSchedule(s Schedule) (schedule.Schedule, error) { d, s.StartTimestamp, s.StopTimestamp, + s.Count, ) err = sch.Validate() @@ -78,7 +65,7 @@ func makeSchedule(s Schedule) (schedule.Schedule, error) { return sch, nil case "cron": if s.Interval == "" { - return nil, errors.New("missing `interval` in configuration of cron schedule") + return nil, ErrMissingScheduleInterval } sch := schedule.NewCronSchedule(s.Interval) @@ -90,6 +77,6 @@ func makeSchedule(s Schedule) (schedule.Schedule, error) { case "streaming": return schedule.NewStreamingSchedule(), nil default: - return nil, errors.New("unknown schedule type " + s.Type) + return nil, fmt.Errorf("unknown schedule type `%s`", s.Type) } } diff --git a/core/schedule_small_test.go b/core/schedule_small_test.go index 5e9377b23..9a3516842 100644 --- a/core/schedule_small_test.go +++ b/core/schedule_small_test.go @@ -40,7 +40,7 @@ func TestMakeSchedule(t *testing.T) { rsched, err := makeSchedule(*sched1) So(rsched, ShouldBeNil) So(err, ShouldNotBeNil) - So(err.Error(), ShouldEqual, fmt.Sprintf("unknown schedule type %s", DUMMY_TYPE)) + So(err.Error(), ShouldEqual, fmt.Sprintf("unknown schedule type `%s`", DUMMY_TYPE)) }) Convey("Simple schedule with missing interval in configuration", t, func() { @@ -48,7 +48,7 @@ func TestMakeSchedule(t *testing.T) { rsched, err := makeSchedule(*sched1) So(rsched, ShouldBeNil) So(err, ShouldNotBeNil) - So(err.Error(), ShouldEqual, "missing `interval` in configuration of simple schedule") + So(err, ShouldEqual, ErrMissingScheduleInterval) }) Convey("Simple schedule with bad duration", t, func() { @@ -75,12 +75,20 @@ func TestMakeSchedule(t *testing.T) { So(rsched.GetState(), ShouldEqual, 0) }) + Convey("Simple schedule with determined count", t, func() { + sched1 := &Schedule{Type: "simple", Interval: "1s", Count: 1} + rsched, err := makeSchedule(*sched1) + So(err, ShouldBeNil) + So(rsched, ShouldNotBeNil) + So(rsched.GetState(), ShouldEqual, 0) + }) + Convey("Windowed schedule with missing interval", t, func() { sched1 := &Schedule{Type: "windowed"} rsched, err := makeSchedule(*sched1) So(rsched, ShouldBeNil) So(err, ShouldNotBeNil) - So(err.Error(), ShouldStartWith, "missing parameter/parameters in configuration of windowed schedule") + So(err, ShouldEqual, ErrMissingScheduleInterval) }) Convey("Windowed schedule with bad duration", t, func() { @@ -102,22 +110,46 @@ func TestMakeSchedule(t *testing.T) { So(err.Error(), ShouldEqual, "Interval must be greater than 0") }) - Convey("Windowed schedule with missing start_timestamp", t, func() { - now := time.Now() - sched1 := &Schedule{Type: "windowed", Interval: "1s", StopTimestamp: &now} + Convey("Windowed schedule with determined start_timestamp and count", t, func() { + startTime := time.Now().Add(time.Minute) + sched1 := &Schedule{Type: "simple", Interval: "1s", StartTimestamp: &startTime, Count: 1} rsched, err := makeSchedule(*sched1) - So(rsched, ShouldBeNil) - So(err, ShouldNotBeNil) - So(err.Error(), ShouldStartWith, "missing parameter/parameters in configuration of windowed schedule") + So(err, ShouldBeNil) + So(rsched, ShouldNotBeNil) + So(rsched.GetState(), ShouldEqual, 0) }) - Convey("Windowed schedule with missing stop_timestamp", t, func() { - now := time.Now() - sched1 := &Schedule{Type: "windowed", Interval: "1s", StartTimestamp: &now} + Convey("Windowed schedule without determined start_timestamp", t, func() { + stopTime := time.Now().Add(time.Second) + sched1 := &Schedule{Type: "windowed", Interval: "1s", StopTimestamp: &stopTime} rsched, err := makeSchedule(*sched1) - So(rsched, ShouldBeNil) - So(err, ShouldNotBeNil) - So(err.Error(), ShouldStartWith, "missing parameter/parameters in configuration of windowed schedule") + So(err, ShouldBeNil) + So(rsched, ShouldNotBeNil) + }) + + Convey("Windowed schedule without determined stop_timestamp", t, func() { + startTime := time.Now().Add(time.Second) + sched1 := &Schedule{Type: "windowed", Interval: "1s", StartTimestamp: &startTime} + rsched, err := makeSchedule(*sched1) + So(err, ShouldBeNil) + So(rsched, ShouldNotBeNil) + }) + + Convey("Windowed schedule without determined start and stop", t, func() { + sched1 := &Schedule{Type: "windowed", Interval: "1s"} + rsched, err := makeSchedule(*sched1) + So(err, ShouldBeNil) + So(rsched, ShouldNotBeNil) + }) + + Convey("Windowed schedule with start in the past", t, func() { + startTime := time.Now().Add(-2 * time.Minute) + stopTime := time.Now().Add(1 * time.Minute) + sched1 := &Schedule{Type: "windowed", Interval: "1s", + StartTimestamp: &startTime, StopTimestamp: &stopTime} + rsched, err := makeSchedule(*sched1) + So(err, ShouldBeNil) + So(rsched, ShouldNotBeNil) }) Convey("Windowed schedule with stop in the past", t, func() { @@ -147,7 +179,7 @@ func TestMakeSchedule(t *testing.T) { rsched, err := makeSchedule(*sched1) So(rsched, ShouldBeNil) So(err, ShouldNotBeNil) - So(err.Error(), ShouldEqual, "missing `interval` in configuration of cron schedule") + So(err, ShouldEqual, ErrMissingScheduleInterval) }) Convey("Cron schedule with invalid duration", t, func() { diff --git a/docs/SNAPTEL.md b/docs/SNAPTEL.md index 610867a5f..b52dfe5d5 100644 --- a/docs/SNAPTEL.md +++ b/docs/SNAPTEL.md @@ -60,6 +60,7 @@ create There are two ways to create a task. --task-manifest value, -t value File path for task manifest to use for task creation. --workflow-manifest value, -w value File path for workflow manifest to use for task creation --interval value, -i value Interval for the task schedule [ex (simple schedule): 250ms, 1s, 30m (cron schedule): "0 * * * * *"] + --count value The count of runs for the task schedule [defaults to 0 what means no limit, e.g. set to 1 determines a single run task] --start-date value Start date for the task schedule [defaults to today] --start-time value Start time for the task schedule [defaults to now] --stop-date value Stop date for the task schedule [defaults to today] @@ -186,9 +187,10 @@ and then: 3. load a publishing plugin 4. list the plugins 5. start a task with a task manifest -6. start a task with a workflow manifest -7. list the tasks -8. unload the plugins +6. start a single run task with a task manifest +7. start a task with a workflow manifest +8. list the tasks +9. unload the plugins ``` $ snaptel plugin load /opt/snap/plugins/snap-plugin-collector-mock1 @@ -196,6 +198,7 @@ $ snaptel plugin load /opt/snap/plugins/snap-plugin-processor-passthru $ snaptel plugin load /opt/snap/plugins/snap-plugin-publisher-mock-file $ snaptel plugin list $ snaptel task create -t mock-file.json +$ snaptel task create -t mock-file.json --count 1 $ snaptel task create -w workflow.json -i 1s -d 10s $ snaptel task list $ snaptel plugin unload collector mock diff --git a/docs/TASKS.md b/docs/TASKS.md index 508ae580f..955d78db5 100644 --- a/docs/TASKS.md +++ b/docs/TASKS.md @@ -12,9 +12,9 @@ A task can be in the following states: - **running:** a running task - **stopped:** a task that is not running - **disabled:** a task in a state not allowed to start. This happens when the task produces consecutive errors. A disabled task must be re-enabled before it can be started again. -- **ended:** a task for which the schedule is ended. At present this happens only for windowed schedule with defined _stop_timestamp_. An ended task is resumable if the schedule is still valid. +- **ended:** a task for which the schedule is ended. It happens for schedule with defined _stop_timestamp_ or with specified the _count_ of runs. An ended task is resumable if the schedule is still valid. -![statediagram](https://cloud.githubusercontent.com/assets/11335874/23362447/0f0b9f74-fcf6-11e6-93d7-889a7ccdc45f.png) +![statediagram](https://cloud.githubusercontent.com/assets/11335874/23774722/62526aaa-0525-11e7-9ce8-894a8e2cbdf1.png) How To | Command ----------------------------------------|---------------------------------------- @@ -48,46 +48,141 @@ The header contains a version, used to differentiate between versions of the tas #### Schedule -The schedule describes the schedule type and interval for running the task. The type of a schedule could be a simple "run forever" schedule, which is what we see above as `"simple"` or something more complex. Snap is designed in a way where custom schedulers can easily be dropped in. If a custom schedule is used, it may require more key/value pairs in the schedule section of the manifest. At the time of this writing, Snap has three schedules: -- **simple schedule** which is described above, -- **window schedule** which adds a start and stop time for the task. The time must be given as a quoted string in [RFC 3339](https://www.ietf.org/rfc/rfc3339.txt) format, for example with specific time zone offset: -```json - "version": 1, - "schedule": { - "type": "windowed", - "interval": "1s", - "start_timestamp": "2016-10-27T16:39:57+01:00", - "stop_timestamp": "2016-10-28T16:39:57+01:00" - }, - "max-failures": 10, -``` -or without time zone offset (in that cases uppercase'Z' must be present): -```json - "version": 1, - "schedule": { - "type": "windowed", - "interval": "1s", - "start_timestamp": "2016-10-27T16:39:57Z", - "stop_timestamp": "2016-10-28T16:39:57Z" - }, - "max-failures": 10, -``` -- **cron schedule** which supports cron-like entries in ```interval``` field, like in this example (workflow will fire every hour on the half hour): -```json - "version": 1, - "schedule": { - "type": "cron", - "interval" : "0 30 * * * *" - }, - "max-failures": 10, -``` -More on cron expressions can be found here: https://godoc.org/github.com/robfig/cron - +The schedule describes the schedule type and interval for running the task. At the time of this writing, Snap has three schedules: + - [simple](#simple-schedule) + - [windowed](#windowed-schedule) + - [cron](#cron-schedule) + +Snap is designed in a way where custom schedulers can easily be dropped in. If a custom schedule is used, it may require more key/value pairs in the schedule section of the manifest. + + +##### Simple Schedule + + Key | Type | Description +----------------------------|---------------|----------------- + interval(*) | string | An interval specifies the time duration between each scheduled execution; It must be greater than 0. + count | uint | A count determines the number of expected scheduled executions at interval seconds apart. Defaults to 0 what means no limit. Set the count to 1 if you expect a single run task. + +(*) is required + + - simple "run forever" schedule: + ```json + "version": 1, + "schedule": { + "type": "simple", + "interval": "1s" + }, + "max-failures": 10, + ``` + + - simple "run X times" schedule: + ```json + "version": 1, + "schedule": { + "type": "simple", + "interval": "1s", + "count": 1 + }, + "max-failures": 1, + ``` + + +##### Windowed Schedule + + The windowed schedule adds a start and/or stop time for the task. + + Key | Type | Description +--------------------------------|---------------|----------------- + interval(*) | string | An interval specifies the time duration between each scheduled execution; It must be greater than 0. + start_timestamp(1) | string | A start time for the task schedule. If not determined, the schedule will start immediately. + stop_timestamp(1) | string | A stop time for the task schedule. If not determined, the schedule will be running all the time until the stop command is not called. + count | uint | A count determines the number of expected scheduled executions at interval seconds apart. Defaults to 0 what means no limit. Set the count to 1 if you expect a single run task. + + + (*) is required + + (1) the time must be given as a quoted string in [RFC 3339](https://www.ietf.org/rfc/rfc3339.txt) format with specific time zone offset + + Notice: Specifying both the _stop_timestamp_ and the _count_ is not allowed. In such case, you receive a warning that the value of the _count_ field will be ignored. + + - a regular window with determined both start and stop time: + ```json + "version": 1, + "schedule": { + "type": "windowed", + "interval": "1s", + "start_timestamp": "2016-10-27T16:00:00+01:00", + "stop_timestamp": "2016-10-28T16:30:00+01:00" + }, + "max-failures": 10, + ``` + + - start schedule on _start_timestamp_ and "run forever": + (a window with determined only stop time) + ```json + "version": 1, + "schedule": { + "type": "windowed", + "interval": "1s", + "start_timestamp": "2016-10-27T16:00:00+01:00" + }, + "max-failures": 10, + ``` + + - start schedule immediately and finish on _stop time_: + (a window with determined only start time) + ```json + "version": 1, + "schedule": { + "type": "windowed", + "interval": "1s", + "stop_timestamp": "2016-10-28T16:30:00+01:00" + }, + "max-failures": 10, + ``` + + - start schedule on _start time_ and run "X times": + (a window with determined start time and count) + ```json + "version": 1, + "schedule": { + "type": "windowed", + "interval": "1s", + "start_timestamp": "2016-10-27T16:00:00+01:00", + "count": 1 + }, + "max-failures": 1, + ``` + + +##### Cron Schedule + + The cron schedule supports cron-like entries in `interval` field. More on cron expressions can be found here: https://godoc.org/github.com/robfig/cron + + Key | Type | Description +--------------------------------|---------------|----------------- + interval(*) | string | An interval specifies the time duration between each scheduled execution in cron-like entries. More on cron expressions can be found here: https://godoc.org/github.com/robfig/cron. + +(*) is required + + - schedule task every hour on the half hour: + + ```json + "version": 1, + "schedule": { + "type": "cron", + "interval" : "0 30 * * * *" + }, + "max-failures": 10, + ``` + + + #### Max-Failures By default, Snap will disable a task if there are 10 consecutive errors from any plugins within the workflow. The configuration can be changed by specifying the number of failures value in the task header. If the `max-failures` value is -1, Snap will -not disable a task with consecutive failure. Instead, Snap will sleep for 1 second for every 10 consecutive failures +not disable a task with consecutive failure. Instead, Snap will sleep for 1 second for every 10 consecutive failures and retry again. If you intend to run tasks with `max-failures: -1`, please also configure `max_plugin_restarts: -1` in [snap daemon control configuration section](SNAPTELD_CONFIGURATION.md). diff --git a/examples/tasks/psutil-file.yaml b/examples/tasks/psutil-file.yaml index 018d0710e..bea27b045 100644 --- a/examples/tasks/psutil-file.yaml +++ b/examples/tasks/psutil-file.yaml @@ -3,6 +3,7 @@ schedule: type: "simple" interval: "1s" + count: 5 max-failures: 10 workflow: collect: diff --git a/mgmt/rest/client/client_func_test.go b/mgmt/rest/client/client_func_test.go index 342d52d5a..38a03a84a 100644 --- a/mgmt/rest/client/client_func_test.go +++ b/mgmt/rest/client/client_func_test.go @@ -376,9 +376,8 @@ func TestSnapClient(t *testing.T) { So(tt.Err, ShouldNotBeNil) }) - Convey("Creating a task with missing parameters (start_timestamp and stop_timestamp) "+ - "for windowed schedule", func() { - incorrectSchedule := &Schedule{Type: "windowed", Interval: "1s"} + Convey("Creating a task with missing parameter (interval) for windowed schedule", func() { + incorrectSchedule := &Schedule{Type: "windowed"} tt := c.CreateTask(incorrectSchedule, wf, "baron", "", true, 0) So(tt.Err, ShouldNotBeNil) }) @@ -397,14 +396,32 @@ func TestSnapClient(t *testing.T) { }) Convey("Creating a task with correct configuration for windowed schedule", func() { - startTime := time.Now().Add(time.Minute) - stopTime := time.Now().Add(2 * time.Minute) - correctSchedule := &Schedule{Type: "windowed", Interval: "1s", - StartTimestamp: &startTime, - StopTimestamp: &stopTime} - tt := c.CreateTask(correctSchedule, wf, "baron", "", true, 0) - So(tt.Err, ShouldBeNil) - So(tt.State, ShouldEqual, "Running") + Convey("regular window", func() { + startTime := time.Now().Add(time.Minute) + stopTime := time.Now().Add(2 * time.Minute) + correctSchedule := &Schedule{Type: "windowed", Interval: "1s", + StartTimestamp: &startTime, + StopTimestamp: &stopTime} + tt := c.CreateTask(correctSchedule, wf, "baron", "", true, 0) + So(tt.Err, ShouldBeNil) + So(tt.State, ShouldEqual, "Running") + }) + Convey("stop time is not set", func() { + startTime := time.Now().Add(time.Minute) + correctSchedule := &Schedule{Type: "windowed", Interval: "1s", + StartTimestamp: &startTime} + tt := c.CreateTask(correctSchedule, wf, "baron", "", true, 0) + So(tt.Err, ShouldBeNil) + So(tt.State, ShouldEqual, "Running") + }) + Convey("start time is not set", func() { + stopTime := time.Now().Add(2 * time.Minute) + correctSchedule := &Schedule{Type: "windowed", Interval: "1s", + StopTimestamp: &stopTime} + tt := c.CreateTask(correctSchedule, wf, "baron", "", true, 0) + So(tt.Err, ShouldBeNil) + So(tt.State, ShouldEqual, "Running") + }) }) Convey("Creating a task with correct configuration for cron schedule", func() { diff --git a/mgmt/rest/client/task.go b/mgmt/rest/client/task.go index 6e170a4ea..4eeecda15 100644 --- a/mgmt/rest/client/task.go +++ b/mgmt/rest/client/task.go @@ -42,6 +42,9 @@ type Schedule struct { StartTimestamp *time.Time `json:"start_timestamp,omitempty"` // StopTimestamp specifies the end time. StopTimestamp *time.Time `json:"stop_timestamp,omitempty"` + // Count specifies the number of expected runs (defaults to 0 what means no limit, set to 1 means single run task). + // Count is supported by "simple" and "windowed" schedules + Count uint `json:"count,omitempty"` } // CreateTask creates a task given the schedule, workflow, task name, and task state. @@ -55,12 +58,12 @@ func (c *Client) CreateTask(s *Schedule, wf *wmap.WorkflowMap, name string, dead Interval: s.Interval, StartTimestamp: s.StartTimestamp, StopTimestamp: s.StopTimestamp, + Count: s.Count, }, Workflow: wf, Start: startTask, MaxFailures: maxFailures, } - if name != "" { t.Name = name } diff --git a/mgmt/rest/rest_test.go b/mgmt/rest/rest_test.go index fef07d928..744bb61e8 100644 --- a/mgmt/rest/rest_test.go +++ b/mgmt/rest/rest_test.go @@ -32,7 +32,7 @@ import ( "github.com/intelsdi-x/snap/scheduler" ) -// common ressources used for medium tests +// common resources used for medium tests var ( // Switching this turns on logging for all the REST API calls diff --git a/mgmt/rest/rest_v1_test.go b/mgmt/rest/rest_v1_test.go index d907fc143..aec38e2ba 100644 --- a/mgmt/rest/rest_v1_test.go +++ b/mgmt/rest/rest_v1_test.go @@ -858,9 +858,10 @@ func TestV1Task(t *testing.T) { // GetTasks returns an unordered map, // thus there is more than one possible response So( + string(body), + ShouldBeIn, responses, - ShouldContain, - string(body)) + ) }) Convey("Get task - v1/tasks/:id", func() { @@ -872,9 +873,10 @@ func TestV1Task(t *testing.T) { body, err := ioutil.ReadAll(resp.Body) So(err, ShouldBeNil) So( - fmt.Sprintf(fixtures.GET_TASK_RESPONSE, r.port), + string(body), ShouldResemble, - string(body)) + fmt.Sprintf(fixtures.GET_TASK_RESPONSE, r.port), + ) }) Convey("Watch tasks - v1/tasks/:id/watch", func() { @@ -897,9 +899,10 @@ func TestV1Task(t *testing.T) { body, err := ioutil.ReadAll(resp.Body) So(err, ShouldBeNil) So( - fmt.Sprintf(fixtures.ADD_TASK_RESPONSE, r.port), + string(body), ShouldResemble, - string(body)) + fmt.Sprintf(fixtures.ADD_TASK_RESPONSE, r.port), + ) }) Convey("Start tasks - v1/tasks/:id/start", func() { @@ -945,9 +948,10 @@ func TestV1Task(t *testing.T) { body, err = ioutil.ReadAll(resp.Body) So(err, ShouldBeNil) So( - fmt.Sprintf(fixtures.STOP_TASK_RESPONSE_ID_STOP), + string(body), ShouldResemble, - string(body)) + fmt.Sprintf(fixtures.STOP_TASK_RESPONSE_ID_STOP), + ) }) Convey("Enable tasks - v1/tasks/:id/enable", func() { @@ -969,9 +973,10 @@ func TestV1Task(t *testing.T) { body, err = ioutil.ReadAll(resp.Body) So(err, ShouldBeNil) So( - fmt.Sprintf(fixtures.ENABLE_TASK_RESPONSE_ID_ENABLE), + string(body), ShouldResemble, - string(body)) + fmt.Sprintf(fixtures.ENABLE_TASK_RESPONSE_ID_ENABLE), + ) }) Convey("Remove tasks - V1/tasks/:id", func() { @@ -993,9 +998,10 @@ func TestV1Task(t *testing.T) { body, err = ioutil.ReadAll(resp.Body) So(err, ShouldBeNil) So( - fmt.Sprintf(fixtures.REMOVE_TASK_RESPONSE_ID), + string(body), ShouldResemble, - string(body)) + fmt.Sprintf(fixtures.REMOVE_TASK_RESPONSE_ID), + ) }) }) } @@ -1011,9 +1017,10 @@ func TestV1Tribe(t *testing.T) { body, err := ioutil.ReadAll(resp.Body) So(err, ShouldBeNil) So( - fmt.Sprintf(fixtures.GET_TRIBE_AGREEMENTS_RESPONSE), + string(body), ShouldResemble, - string(body)) + fmt.Sprintf(fixtures.GET_TRIBE_AGREEMENTS_RESPONSE), + ) }) Convey("Add tribe agreements - /v1/tribe/agreements", func() { @@ -1027,9 +1034,10 @@ func TestV1Tribe(t *testing.T) { body, err := ioutil.ReadAll(resp.Body) So(err, ShouldBeNil) So( - fmt.Sprintf(fixtures.ADD_TRIBE_AGREEMENT_RESPONSE), + string(body), ShouldResemble, - string(body)) + fmt.Sprintf(fixtures.ADD_TRIBE_AGREEMENT_RESPONSE), + ) }) Convey("Get tribe agreements - v1/tribe/agreements/:name", func() { @@ -1041,9 +1049,10 @@ func TestV1Tribe(t *testing.T) { body, err := ioutil.ReadAll(resp.Body) So(err, ShouldBeNil) So( - fmt.Sprintf(fixtures.GET_TRIBE_AGREEMENTS_RESPONSE_NAME), + string(body), ShouldResemble, - string(body)) + fmt.Sprintf(fixtures.GET_TRIBE_AGREEMENTS_RESPONSE_NAME), + ) }) Convey("Get tribe members - v1/tribe/members", func() { @@ -1054,9 +1063,10 @@ func TestV1Tribe(t *testing.T) { body, err := ioutil.ReadAll(resp.Body) So(err, ShouldBeNil) So( - fmt.Sprintf(fixtures.GET_TRIBE_MEMBERS_RESPONSE), + string(body), ShouldResemble, - string(body)) + fmt.Sprintf(fixtures.GET_TRIBE_MEMBERS_RESPONSE), + ) }) Convey("Get tribe member - v1/tribe/member/:name", func() { @@ -1068,9 +1078,10 @@ func TestV1Tribe(t *testing.T) { body, err := ioutil.ReadAll(resp.Body) So(err, ShouldBeNil) So( - fmt.Sprintf(fixtures.GET_TRIBE_MEMBER_NAME), + string(body), ShouldResemble, - string(body)) + fmt.Sprintf(fixtures.GET_TRIBE_MEMBER_NAME), + ) }) Convey("Delete tribe agreement - v1/tribe/agreements/:name", func() { @@ -1092,9 +1103,10 @@ func TestV1Tribe(t *testing.T) { body, err = ioutil.ReadAll(resp.Body) So(err, ShouldBeNil) So( - fmt.Sprintf(fixtures.DELETE_TRIBE_AGREEMENT_RESPONSE_NAME), + string(body), ShouldResemble, - string(body)) + fmt.Sprintf(fixtures.DELETE_TRIBE_AGREEMENT_RESPONSE_NAME), + ) }) Convey("Leave tribe agreement - v1/tribe/agreements/:name/leave", func() { @@ -1116,9 +1128,10 @@ func TestV1Tribe(t *testing.T) { body, err = ioutil.ReadAll(resp.Body) So(err, ShouldBeNil) So( - fmt.Sprintf(fixtures.LEAVE_TRIBE_AGREEMENT_RESPONSE_NAME_LEAVE), + string(body), ShouldResemble, - string(body)) + fmt.Sprintf(fixtures.LEAVE_TRIBE_AGREEMENT_RESPONSE_NAME_LEAVE), + ) }) Convey("Join tribe agreement - v1/tribe/agreements/:name/join", func() { @@ -1140,9 +1153,10 @@ func TestV1Tribe(t *testing.T) { body, err = ioutil.ReadAll(resp.Body) So(err, ShouldBeNil) So( - fmt.Sprintf(fixtures.JOIN_TRIBE_AGREEMENT_RESPONSE_NAME_JOIN), + string(body), ShouldResemble, - string(body)) + fmt.Sprintf(fixtures.JOIN_TRIBE_AGREEMENT_RESPONSE_NAME_JOIN), + ) }) }) diff --git a/mgmt/rest/rest_v2_test.go b/mgmt/rest/rest_v2_test.go index b3b3d935e..5b2d8fb3e 100644 --- a/mgmt/rest/rest_v2_test.go +++ b/mgmt/rest/rest_v2_test.go @@ -288,9 +288,10 @@ func TestV2Task(t *testing.T) { body, err := ioutil.ReadAll(resp.Body) So(err, ShouldBeNil) So( - fmt.Sprintf(mock.ADD_TASK_RESPONSE, r.port), + string(body), ShouldResemble, - string(body)) + fmt.Sprintf(mock.ADD_TASK_RESPONSE, r.port), + ) }) Convey("Get tasks - v2/tasks", func() { @@ -307,9 +308,10 @@ func TestV2Task(t *testing.T) { // GetTasks returns an unordered map, // thus there is more than one possible response So( + string(body), + ShouldBeIn, responses, - ShouldContain, - string(body)) + ) }) Convey("Get task - v2/tasks/:id", func() { @@ -321,9 +323,10 @@ func TestV2Task(t *testing.T) { body, err := ioutil.ReadAll(resp.Body) So(err, ShouldBeNil) So( - fmt.Sprintf(mock.GET_TASK_RESPONSE, r.port), + string(body), ShouldResemble, - string(body)) + fmt.Sprintf(mock.GET_TASK_RESPONSE, r.port), + ) }) Convey("Watch tasks - v2/tasks/:id/watch", func() { diff --git a/mgmt/rest/v1/fixtures/mock_task_manager.go b/mgmt/rest/v1/fixtures/mock_task_manager.go index e4fad9397..b2988c43c 100644 --- a/mgmt/rest/v1/fixtures/mock_task_manager.go +++ b/mgmt/rest/v1/fixtures/mock_task_manager.go @@ -94,7 +94,8 @@ func (t *mockTask) WMap() *wmap.WorkflowMap { return wmap.NewWorkflowMap() } func (t *mockTask) Schedule() schedule.Schedule { - return schedule.NewSimpleSchedule(time.Second * 1) + // return a simple schedule (equals to windowed schedule without determined start and stop timestamp) + return schedule.NewWindowedSchedule(time.Second*1, nil, nil, 0) } func (t *mockTask) MaxFailures() int { return 10 } @@ -249,7 +250,7 @@ const ( } }, "schedule": { - "type": "simple", + "type": "windowed", "interval": "1s" }, "creation_timestamp": -62135596800, @@ -276,7 +277,7 @@ const ( } }, "schedule": { - "type": "simple", + "type": "windowed", "interval": "1s" }, "creation_timestamp": -62135596800, @@ -327,7 +328,7 @@ const ( } }, "schedule": { - "type": "simple", + "type": "windowed", "interval": "1s" }, "creation_timestamp": -62135596800, diff --git a/mgmt/rest/v1/rbody/task.go b/mgmt/rest/v1/rbody/task.go index ea1eb1ac2..4fca02f91 100644 --- a/mgmt/rest/v1/rbody/task.go +++ b/mgmt/rest/v1/rbody/task.go @@ -217,12 +217,6 @@ func (s *ScheduledTaskEnabled) ResponseBodyType() string { func assertSchedule(s schedule.Schedule, t *AddScheduledTask) { switch v := s.(type) { - case *schedule.SimpleSchedule: - t.Schedule = &core.Schedule{ - Type: "simple", - Interval: v.Interval.String(), - } - return case *schedule.WindowedSchedule: t.Schedule = &core.Schedule{ Type: "windowed", diff --git a/mgmt/rest/v2/mock/mock_task_manager.go b/mgmt/rest/v2/mock/mock_task_manager.go index f05d26476..305f426a6 100644 --- a/mgmt/rest/v2/mock/mock_task_manager.go +++ b/mgmt/rest/v2/mock/mock_task_manager.go @@ -94,7 +94,8 @@ func (t *mockTask) WMap() *wmap.WorkflowMap { return wmap.NewWorkflowMap() } func (t *mockTask) Schedule() schedule.Schedule { - return schedule.NewSimpleSchedule(time.Second * 1) + // return a simple schedule (equals to windowed schedule without determined start and stop timestamp) + return schedule.NewWindowedSchedule(time.Second*1, nil, nil, 0) } func (t *mockTask) MaxFailures() int { return 10 } @@ -228,7 +229,7 @@ const ( } }, "schedule": { - "type": "simple", + "type": "windowed", "interval": "1s" }, "creation_timestamp": -62135596800, @@ -248,7 +249,7 @@ const ( } }, "schedule": { - "type": "simple", + "type": "windowed", "interval": "1s" }, "creation_timestamp": -62135596800, diff --git a/mgmt/rest/v2/task.go b/mgmt/rest/v2/task.go index 77e13c0b2..e18fd7060 100644 --- a/mgmt/rest/v2/task.go +++ b/mgmt/rest/v2/task.go @@ -198,12 +198,6 @@ func SchedulerTaskFromTask(t core.Task) Task { func (t *Task) assertSchedule(s schedule.Schedule) { switch v := s.(type) { - case *schedule.SimpleSchedule: - t.Schedule = &core.Schedule{ - Type: "simple", - Interval: v.Interval.String(), - } - return case *schedule.WindowedSchedule: t.Schedule = &core.Schedule{ Type: "windowed", diff --git a/mgmt/tribe/worker/worker.go b/mgmt/tribe/worker/worker.go index e5ac569e6..6a6d9fdb5 100644 --- a/mgmt/tribe/worker/worker.go +++ b/mgmt/tribe/worker/worker.go @@ -545,14 +545,49 @@ func (w worker) isPluginLoaded(n, t string, v int) bool { } func getSchedule(s *core.Schedule) schedule.Schedule { + logger := log.WithFields(log.Fields{ + "_block": "get-schedule", + "schedule-type": s.Type, + }) switch s.Type { - case "simple": - d, e := time.ParseDuration(s.Interval) - if e != nil { - log.WithField("_block", "get-schedule").Error(e) + case "simple", "windowed": + if s.Interval == "" { + logger.Error(core.ErrMissingScheduleInterval) + return nil + } + d, err := time.ParseDuration(s.Interval) + if err != nil { + logger.Error(err) + return nil + } + sch := schedule.NewWindowedSchedule( + d, + s.StartTimestamp, + s.StopTimestamp, + s.Count, + ) + if err = sch.Validate(); err != nil { + logger.Error(err) + return nil + } + return sch + case "cron": + if s.Interval == "" { + logger.Error(core.ErrMissingScheduleInterval) + return nil + } + sch := schedule.NewCronSchedule(s.Interval) + if err := sch.Validate(); err != nil { + logger.Error(err) return nil } - return schedule.NewSimpleSchedule(d) + return sch + case "streaming": + logger.Error("streaming is not yet available for tribe") + //todo + //return schedule.NewStreamingSchedule() + default: + logger.Error("unknown schedule type") } return nil } diff --git a/pkg/schedule/schedule.go b/pkg/schedule/schedule.go index 0e29e0b1c..25b19b70f 100644 --- a/pkg/schedule/schedule.go +++ b/pkg/schedule/schedule.go @@ -1,3 +1,22 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package schedule import ( @@ -6,7 +25,7 @@ import ( ) var ( - // ErrInvalidInterval - Error message for the valid schedule interval must ne greater than 0 + // ErrInvalidInterval - Error message for the valid schedule interval must be greater than 0 ErrInvalidInterval = errors.New("Interval must be greater than 0") // ErrInvalidStopTime - Error message for the stop tome is in the past ErrInvalidStopTime = errors.New("Stop time is in the past") diff --git a/pkg/schedule/simple_schedule.go b/pkg/schedule/simple_schedule.go deleted file mode 100644 index f70b7cfb0..000000000 --- a/pkg/schedule/simple_schedule.go +++ /dev/null @@ -1,65 +0,0 @@ -package schedule - -import ( - "time" -) - -// SimpleSchedule is a schedule that only implements an endless repeating interval -type SimpleSchedule struct { - Interval time.Duration - state ScheduleState -} - -// NewSimpleSchedule returns the SimpleSchedule given the time interval -func NewSimpleSchedule(i time.Duration) *SimpleSchedule { - return &SimpleSchedule{ - Interval: i, - } -} - -// GetState returns the schedule state -func (s *SimpleSchedule) GetState() ScheduleState { - return s.state -} - -// Validate returns an error if the interval of schedule is less -// or equals zero -func (s *SimpleSchedule) Validate() error { - if s.Interval <= 0 { - return ErrInvalidInterval - } - return nil -} - -// Wait returns the SimpleSchedule state, misses and the last schedule ran -func (s *SimpleSchedule) Wait(last time.Time) Response { - m, t := waitOnInterval(last, s.Interval) - return &SimpleScheduleResponse{state: s.GetState(), missed: m, lastTime: t} -} - -// SimpleScheduleResponse a response from SimpleSchedule conforming to ScheduleResponse interface -type SimpleScheduleResponse struct { - state ScheduleState - missed uint - lastTime time.Time -} - -// State returns the state of the Schedule -func (s *SimpleScheduleResponse) State() ScheduleState { - return s.state -} - -// Error returns last error -func (s *SimpleScheduleResponse) Error() error { - return nil -} - -// Missed returns any missed intervals -func (s *SimpleScheduleResponse) Missed() uint { - return s.missed -} - -// LastTime returns the last response time -func (s *SimpleScheduleResponse) LastTime() time.Time { - return s.lastTime -} diff --git a/pkg/schedule/simple_schedule_test.go b/pkg/schedule/simple_schedule_test.go deleted file mode 100644 index 8abaa3020..000000000 --- a/pkg/schedule/simple_schedule_test.go +++ /dev/null @@ -1,113 +0,0 @@ -// +build legacy - -package schedule - -import ( - "testing" - "time" - - . "github.com/smartystreets/goconvey/convey" -) - -func TestSimpleSchedule(t *testing.T) { - Convey("Simple Schedule", t, func() { - Convey("test Wait()", func() { - interval := 100 - overage := 467 - shouldWait := float64(500 - overage) - last := time.Now() - - time.Sleep(time.Millisecond * time.Duration(overage)) - s := NewSimpleSchedule(time.Millisecond * time.Duration(interval)) - err := s.Validate() - So(err, ShouldBeNil) - - before := time.Now() - r := s.Wait(last) - after := time.Since(before) - - So(r.Error(), ShouldEqual, nil) - So(r.State(), ShouldEqual, Active) - So(r.Missed(), ShouldEqual, 4) - // We are ok at this precision with being within 10% over or under (10ms) - afterMS := after.Nanoseconds() / 1000 / 1000 - So(afterMS, ShouldBeGreaterThan, shouldWait-10) - So(afterMS, ShouldBeLessThan, shouldWait+10) - }) - Convey("invalid schedule", func() { - s := NewSimpleSchedule(0) - err := s.Validate() - So(err, ShouldResemble, ErrInvalidInterval) - }) - }) - Convey("Simple schedule with no misses", t, func() { - interval := time.Millisecond * 10 - s := NewSimpleSchedule(interval) - - err := s.Validate() - So(err, ShouldBeNil) - - var r []Response - last := *new(time.Time) - - before := time.Now() - for len(r) <= 10 { - r1 := s.Wait(last) - last = time.Now() - r = append(r, r1) - } - - var missed uint - for _, x := range r { - missed += x.Missed() - } - So(missed, ShouldEqual, 0) - - // the task should start immediately - So( - r[0].LastTime().Sub(before).Seconds(), - ShouldBeBetweenOrEqual, - 0, - (interval).Seconds(), - ) - }) - Convey("Simple schedule with a few misses", t, func() { - interval := time.Millisecond * 10 - s := NewSimpleSchedule(interval) - - err := s.Validate() - So(err, ShouldBeNil) - - var r []Response - last := *new(time.Time) - - before := time.Now() - for len(r) <= 10 { - r1 := s.Wait(last) - last = time.Now() - r = append(r, r1) - // make it miss some - if len(r) == 3 || len(r) == 7 { - time.Sleep(s.Interval) - } - if len(r) == 9 { - // Miss two - time.Sleep(s.Interval * 2) - } - } - - var missed uint - for _, x := range r { - missed += x.Missed() - } - So(missed, ShouldEqual, 4) - - // the task should fire immediately - So( - r[0].LastTime().Sub(before).Seconds(), - ShouldBeBetweenOrEqual, - 0, - (interval).Seconds(), - ) - }) -} diff --git a/pkg/schedule/streaming_schedule.go b/pkg/schedule/streaming_schedule.go index bd7232dfb..e6de4909c 100644 --- a/pkg/schedule/streaming_schedule.go +++ b/pkg/schedule/streaming_schedule.go @@ -1,3 +1,22 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2017 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package schedule import "time" @@ -46,7 +65,7 @@ func (s *StreamingScheduleResponse) Missed() uint { return 0 } -// LastTime retruns the last response time +// LastTime returns the last response time func (s *StreamingScheduleResponse) LastTime() time.Time { return time.Time{} } diff --git a/pkg/schedule/windowed_schedule.go b/pkg/schedule/windowed_schedule.go index 582a84350..803eb926b 100644 --- a/pkg/schedule/windowed_schedule.go +++ b/pkg/schedule/windowed_schedule.go @@ -1,3 +1,22 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package schedule import ( @@ -12,35 +31,72 @@ var ( // WindowedSchedule is a schedule that waits on an interval within a specific time window type WindowedSchedule struct { - Interval time.Duration - StartTime *time.Time - StopTime *time.Time - state ScheduleState + Interval time.Duration + StartTime *time.Time + StopTime *time.Time + Count uint + state ScheduleState + stopOnTime *time.Time } -// NewWindowedSchedule returns an instance of WindowedSchedule given duration, -// start and stop time -func NewWindowedSchedule(i time.Duration, start *time.Time, stop *time.Time) *WindowedSchedule { +// NewWindowedSchedule returns an instance of WindowedSchedule with given interval, start and stop timestamp +// and count of expected runs. The value of `count` determines stop time, so specifying it together with `stop` +// is not allowed and the count will be set to defaults 0 in such cases. +func NewWindowedSchedule(i time.Duration, start *time.Time, stop *time.Time, count uint) *WindowedSchedule { + // if stop and count were both defined, ignore the `count` + if count != 0 && stop != nil { + count = 0 + // log about ignoring the `count` + logger.WithFields(log.Fields{ + "_block": "NewWindowedSchedule", + }).Warning("The window stop timestamp and the count cannot be specified simultaneously. The parameter `count` has been ignored.") + } + return &WindowedSchedule{ Interval: i, StartTime: start, StopTime: stop, + Count: count, } } +// setStopOnTime calculates and set the value of the windowed `stopOnTime` which is the right window boundary. +// `stopOnTime` is determined by `StopTime` or, if it is not provided, calculated based on count and interval. +func (w *WindowedSchedule) setStopOnTime() { + if w.StopTime == nil && w.Count != 0 { + // determine the window stop based on the `count` and `interval` + var newStop time.Time + + // if start is not set or points in the past, + // use the current time to calculate stopOnTime + if w.StartTime != nil && time.Now().Before(*w.StartTime) { + newStop = w.StartTime.Add(time.Duration(w.Count) * w.Interval) + } else { + // set a new stop timestamp from this point in time + newStop = time.Now().Add(time.Duration(w.Count) * w.Interval) + } + // set calculated new stop + w.stopOnTime = &newStop + return + } + + // stopOnTime is determined by StopTime + w.stopOnTime = w.StopTime +} + // GetState returns ScheduleState of WindowedSchedule func (w *WindowedSchedule) GetState() ScheduleState { return w.state } -// Validate validates the start, stop and duration interval of -// WindowedSchedule +// Validate validates the start, stop and duration interval of WindowedSchedule func (w *WindowedSchedule) Validate() error { // if the stop time was set but it is in the past, return an error if w.StopTime != nil && time.Now().After(*w.StopTime) { return ErrInvalidStopTime } - // if the start and stop time were both set and the the stop time is before + + // if the start and stop time were both set and the stop time is before // the start time, return an error if w.StopTime != nil && w.StartTime != nil && w.StopTime.Before(*w.StartTime) { return ErrStopBeforeStart @@ -49,12 +105,25 @@ func (w *WindowedSchedule) Validate() error { if w.Interval <= 0 { return ErrInvalidInterval } + + // the schedule passed validation, set as active + w.state = Active return nil } // Wait waits the window interval and return. // Otherwise, it exits with a completed state func (w *WindowedSchedule) Wait(last time.Time) Response { + // If within the window we wait our interval and return + // otherwise we exit with a completed state. + var m uint + + if (last == time.Time{}) { + // the first waiting in cycles, so + // set the `stopOnTime` determining the right-window boundary + w.setStopOnTime() + } + // Do we even have a specific start time? if w.StartTime != nil { // Wait till it is time to start if before the window start @@ -66,40 +135,33 @@ func (w *WindowedSchedule) Wait(last time.Time) Response { }).Debug("Waiting for window to start") time.Sleep(wait) } - } else { - // This has no start like a simple schedule, so execution starts immediately - logger.WithFields(log.Fields{ - "_block": "windowed-wait", - "sleep-duration": 0, - }).Debug("Window start time not defined, start execution immediately") } - // If within the window we wait our interval and return - // otherwise we exit with a completed state. - var m uint // Do we even have a stop time? - if w.StopTime != nil { - if time.Now().Before(*w.StopTime) { + if w.stopOnTime != nil { + if time.Now().Before(*w.stopOnTime) { logger.WithFields(log.Fields{ "_block": "windowed-wait", - "time-before-stop": w.StopTime.Sub(time.Now()), + "time-before-stop": w.stopOnTime.Sub(time.Now()), }).Debug("Within window, calling interval") - logger.WithFields(log.Fields{ - "_block": "windowed-wait", - "last": last, - "interval": w.Interval, - }).Debug("waiting for interval") + m, _ = waitOnInterval(last, w.Interval) + + // check if the schedule should be ended after waiting on interval + if time.Now().After(*w.stopOnTime) { + logger.WithFields(log.Fields{ + "_block": "windowed-wait", + }).Debug("schedule has ended") + w.state = Ended + } } else { + logger.WithFields(log.Fields{ + "_block": "windowed-wait", + }).Debug("schedule has ended") w.state = Ended m = 0 } } else { - logger.WithFields(log.Fields{ - "_block": "windowed-wait", - "last": last, - "interval": w.Interval, - }).Debug("waiting for interval") // This has no end like a simple schedule m, _ = waitOnInterval(last, w.Interval) diff --git a/pkg/schedule/windowed_schedule_medium_test.go b/pkg/schedule/windowed_schedule_medium_test.go new file mode 100644 index 000000000..d0d9faddc --- /dev/null +++ b/pkg/schedule/windowed_schedule_medium_test.go @@ -0,0 +1,594 @@ +// +build medium + +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedule + +import ( + "testing" + "time" + + log "github.com/Sirupsen/logrus" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestWindowedSchedule(t *testing.T) { + log.SetLevel(log.DebugLevel) + + Convey("Windowed Schedule expected to run forever", t, func() { + interval := time.Millisecond * 10 + // set start and stop are nil, and the count is zero what means no limits + w := NewWindowedSchedule(interval, nil, nil, 0) + + err := w.Validate() + So(err, ShouldBeNil) + + Convey("with no misses ", func() { + var r []Response + last := *new(time.Time) + before := time.Now() + + for len(r) <= 10 { + r1 := w.Wait(last) + last = time.Now() + r = append(r, r1) + } + + var missed uint + for _, x := range r { + missed += x.Missed() + } + So(missed, ShouldEqual, 0) + + // the task is expected to fire immediately + So( + r[0].LastTime().Sub(before).Seconds(), + ShouldBeLessThan, + interval.Seconds(), + ) + }) + Convey("with a few misses ", func() { + var r []Response + last := *new(time.Time) + + before := time.Now() + + for len(r) <= 10 { + r1 := w.Wait(last) + last = time.Now() + r = append(r, r1) + // make it miss some + if len(r) == 3 || len(r) == 7 { + time.Sleep(w.Interval) + } + if len(r) == 9 { + // miss two + time.Sleep(2 * w.Interval) + } + } + var missed uint + for _, x := range r { + missed += x.Missed() + } + So(missed, ShouldEqual, 4) + + // the task is expected to fire immediately + So( + r[0].LastTime().Sub(before).Seconds(), + ShouldBeLessThan, + interval.Seconds(), + ) + }) + }) // the end of `Simple Windowed Schedule expected to run forever` + + Convey("Nominal windowed Schedule", t, func() { + Convey("without misses", func() { + startWait := time.Millisecond * 50 + windowSize := time.Millisecond * 200 + interval := time.Millisecond * 10 + + start := time.Now().Add(startWait) + stop := time.Now().Add(startWait + windowSize) + w := NewWindowedSchedule( + interval, + &start, + &stop, + 0, + ) + + err := w.Validate() + So(err, ShouldBeNil) + + var r []Response + last := *new(time.Time) + + state := Active + before := time.Now() + for state == Active { + r1 := w.Wait(last) + state = r1.State() + last = time.Now() + r = append(r, r1) + } + + // there are 0 missed responses, so for this schedule + // we expect to get between 19 - 22 responses + So(len(r), ShouldBeBetweenOrEqual, 19, 22) + + var missed uint + for _, x := range r { + missed += x.Missed() + } + So(missed, ShouldEqual, 0) + + // the task is expected to fire immediately on determined start-time + So( + r[0].LastTime().Sub(before).Seconds(), + ShouldBeBetweenOrEqual, + (startWait).Seconds(), + (startWait + interval).Seconds(), + ) + }) + Convey("with a few misses", func() { + startWait := time.Millisecond * 50 + windowSize := time.Millisecond * 200 + interval := time.Millisecond * 10 + + start := time.Now().Add(startWait) + stop := time.Now().Add(startWait + windowSize) + w := NewWindowedSchedule( + interval, + &start, + &stop, + 0, + ) + + err := w.Validate() + So(err, ShouldBeNil) + + var r []Response + last := *new(time.Time) + + state := Active + before := time.Now() + for state == Active { + r1 := w.Wait(last) + state = r1.State() + last = time.Now() + r = append(r, r1) + // make it miss some + if len(r) == 3 || len(r) == 7 { + time.Sleep(w.Interval) + } + if len(r) == 9 { + // Miss two + time.Sleep(w.Interval * 2) + } + } + + // there are 4 missed responses, so for this schedule + // we expect to get between 15 - 18 responses + So(len(r), ShouldBeBetweenOrEqual, 15, 18) + + var missed uint + for _, x := range r { + missed += x.Missed() + } + So(missed, ShouldEqual, 4) + + // the task is expected to fire on start time + So( + r[0].LastTime().Sub(before).Seconds(), + ShouldBeBetweenOrEqual, + (startWait).Seconds(), + (startWait + interval).Seconds(), + ) + }) + Convey("started in the past", func() { + startWait := time.Millisecond * -200 + windowSize := time.Millisecond * 400 + interval := time.Millisecond * 10 + + start := time.Now().Add(startWait) + stop := time.Now().Add(startWait + windowSize) + w := NewWindowedSchedule( + interval, + &start, + &stop, + 0, + ) + + err := w.Validate() + So(err, ShouldBeNil) + + var r []Response + last := *new(time.Time) + + before := time.Now() + state := Active + for state == Active { + r1 := w.Wait(last) + state = r1.State() + last = time.Now() + r = append(r, r1) + // make it miss some + if len(r) == 3 || len(r) == 7 { + time.Sleep(w.Interval) + } + if len(r) == 9 { + // Miss two + time.Sleep(w.Interval * 2) + } + } + // there are 4 missed responses, so for this schedule + // we expect to get between 15 - 18 responses + So(len(r), ShouldBeBetweenOrEqual, 15, 18) + + var missed uint + for _, x := range r { + missed += x.Missed() + } + So(missed, ShouldEqual, 4) + + // start_time points to the past, + // so the task is expected to fire immediately + So( + r[0].LastTime().Sub(before).Seconds(), + ShouldBeLessThan, + (interval).Seconds(), + ) + }) + Convey("start without stop", func() { + startWait := time.Millisecond * 50 + interval := time.Millisecond * 10 + + start := time.Now().Add(startWait) + w := NewWindowedSchedule( + interval, + &start, + nil, + 0, + ) + + err := w.Validate() + So(err, ShouldBeNil) + + var r []Response + last := *new(time.Time) + + before := time.Now() + for len(r) <= 10 { + r1 := w.Wait(last) + last = time.Now() + r = append(r, r1) + } + + // the task is expected to fire immediately on start time + So( + r[0].LastTime().Sub(before).Seconds(), + ShouldBeBetweenOrEqual, + (startWait).Seconds(), + (startWait + interval).Seconds(), + ) + }) + Convey("stop without start", func() { + windowSize := time.Millisecond * 200 + interval := time.Millisecond * 10 + + stop := time.Now().Add(windowSize) + w := NewWindowedSchedule( + interval, + nil, + &stop, + 0, + ) + + err := w.Validate() + So(err, ShouldBeNil) + + var r []Response + last := *new(time.Time) + + before := time.Now() + for len(r) <= 10 { + r1 := w.Wait(last) + last = time.Now() + r = append(r, r1) + } + + // the task should start immediately + So( + r[0].LastTime().Sub(before).Seconds(), + ShouldBeLessThan, + (interval).Seconds(), + ) + + var missed uint + for _, x := range r { + missed += x.Missed() + } + So(missed, ShouldEqual, 0) + }) + }) // the end of `Nominal windowed Schedule` + + Convey("Windowed Schedule with determined the count of runs", t, func() { + interval := time.Millisecond * 10 + + Convey("expected to start immediately", func() { + Convey("single run", func() { + count := uint(1) + w := NewWindowedSchedule(interval, nil, nil, count) + err := w.Validate() + So(err, ShouldBeNil) + + var r []Response + var r1 Response + last := *new(time.Time) + + state := Active + before := time.Now() + for state == Active { + r1 = w.Wait(last) + last = time.Now() + state = r1.State() + // skip the response about ending the task + if state != Ended { + r = append(r, r1) + } + } + // for this schedule we expect to get 1 response + // and 0 missed responses + So(len(r), ShouldEqual, 1) + So(r[0].Missed(), ShouldEqual, 0) + + // the task is expected to fire immediately + So( + r[0].LastTime().Sub(before).Seconds(), + ShouldBeLessThan, + interval.Seconds(), + ) + }) + Convey("multiply runs", func() { + count := uint(10) + w := NewWindowedSchedule(interval, nil, nil, count) + + err := w.Validate() + So(err, ShouldBeNil) + + Convey("with no misses", func() { + var r []Response + var r1 Response + last := *new(time.Time) + + state := Active + before := time.Now() + for state == Active { + r1 = w.Wait(last) + last = time.Now() + state = r1.State() + // skip the response about ending the task + if state != Ended { + r = append(r, r1) + } + } + // for this schedule we expect to get count=10 responses + // and 0 missed responses + So(len(r), ShouldEqual, count) + var missed uint + for _, x := range r { + missed += x.Missed() + } + So(missed, ShouldEqual, 0) + + // the task is expected to fire immediately + So( + r[0].LastTime().Sub(before).Seconds(), + ShouldBeLessThan, + interval.Seconds(), + ) + }) + Convey("with a few misses", func() { + var r []Response + var r1 Response + last := *new(time.Time) + + state := Active + before := time.Now() + for state == Active { + r1 = w.Wait(last) + last = time.Now() + state = r1.State() + // skip the response about ending the task + if state != Ended { + r = append(r, r1) + } + if len(r) == 3 || len(r) == 7 { + time.Sleep(w.Interval) + } + } + // for this schedule we expect to get 10 responses minus 2 missed responses + So(len(r), ShouldEqual, count-2) + var missed uint + for _, x := range r { + missed += x.Missed() + } + So(missed, ShouldEqual, 2) + + // the task is expected to fire immediately + So( + r[0].LastTime().Sub(before).Seconds(), + ShouldBeLessThan, + interval.Seconds(), + ) + }) + }) + }) + Convey("expected to start on start time", func() { + startWait := time.Millisecond * 100 + + Convey("single run", func() { + count := uint(1) + start := time.Now().Add(startWait) + w := NewWindowedSchedule(interval, &start, nil, count) + err := w.Validate() + So(err, ShouldBeNil) + + var r []Response + var r1 Response + last := *new(time.Time) + + state := Active + before := time.Now() + for state == Active { + r1 = w.Wait(last) + last = time.Now() + state = r1.State() + // skip the response about ending the task + if state != Ended { + r = append(r, r1) + } + } + // for this schedule we expect to get count=1 response + // and 0 missed responses + So(len(r), ShouldEqual, count) + So(r[0].Missed(), ShouldEqual, 0) + + // the task is expected to fire on start timestamp + So( + r[0].LastTime().Sub(before).Seconds(), + ShouldBeBetweenOrEqual, + (startWait).Seconds(), + (startWait + interval).Seconds(), + ) + }) + Convey("multiply runs", func() { + count := uint(10) + start := time.Now().Add(startWait) + w := NewWindowedSchedule(interval, &start, nil, count) + + err := w.Validate() + So(err, ShouldBeNil) + + var r []Response + var r1 Response + last := *new(time.Time) + + state := Active + before := time.Now() + for state == Active { + r1 = w.Wait(last) + last = time.Now() + state = r1.State() + // skip the response about ending the task + if state != Ended { + r = append(r, r1) + } + } + // for this schedule we expect to get count=10 responses + // and 0 missed responses + So(len(r), ShouldEqual, count) + var missed uint + for _, x := range r { + missed += x.Missed() + } + So(missed, ShouldEqual, 0) + + // the task is expected to fire on start time + So( + r[0].LastTime().Sub(before).Seconds(), + ShouldBeBetweenOrEqual, + (startWait).Seconds(), + (startWait + interval).Seconds(), + ) + }) + }) + Convey("started in the past", func() { + startWait := time.Millisecond * -200 + count := uint(1) + interval := time.Millisecond * 10 + + start := time.Now().Add(startWait) + w := NewWindowedSchedule( + interval, + &start, + nil, + count, + ) + + err := w.Validate() + So(err, ShouldBeNil) + + var r []Response + last := *new(time.Time) + + before := time.Now() + state := Active + for state == Active { + r1 := w.Wait(last) + state = r1.State() + last = time.Now() + if state != Ended { + r = append(r, r1) + } + } + + So(len(r), ShouldEqual, 1) + So(r[0].Missed(), ShouldEqual, 0) + + // start_time points to the past, + // so the task is expected to fire immediately + So( + r[0].LastTime().Sub(before).Seconds(), + ShouldBeLessThan, + (interval).Seconds(), + ) + }) + Convey("with determined stop", func() { + startWait := time.Millisecond * 50 + windowSize := time.Millisecond * 200 + interval := time.Millisecond * 10 + count := uint(1) + + start := time.Now().Add(startWait) + stop := time.Now().Add(startWait + windowSize) + w := NewWindowedSchedule( + interval, + &start, + &stop, + count, + ) + + Convey("expected ignoring the count and set as default to 0", func() { + So(w.Count, ShouldEqual, 0) + So(w.StartTime.Equal(start), ShouldBeTrue) + + Convey("another params should have a value as provided", func() { + So(w.StartTime.Equal(start), ShouldBeTrue) + So(w.StopTime.Equal(stop), ShouldBeTrue) + So(w.Interval, ShouldEqual, interval) + }) + }) + + err := w.Validate() + So(err, ShouldBeNil) + }) + }) // the end of `Window schedule with determined the count of runs` +} diff --git a/pkg/schedule/windowed_schedule_small_test.go b/pkg/schedule/windowed_schedule_small_test.go new file mode 100644 index 000000000..d32b6b1a5 --- /dev/null +++ b/pkg/schedule/windowed_schedule_small_test.go @@ -0,0 +1,93 @@ +// +build small + +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedule + +import ( + "testing" + "time" + + log "github.com/Sirupsen/logrus" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestWindowedScheduleValidation(t *testing.T) { + log.SetLevel(log.DebugLevel) + Convey("invalid an interval", t, func() { + Convey("zero value", func() { + interval := time.Millisecond * 0 + w := NewWindowedSchedule(interval, nil, nil, 0) + err := w.Validate() + So(err, ShouldEqual, ErrInvalidInterval) + }) + Convey("negative value", func() { + interval := time.Millisecond * -1 + w := NewWindowedSchedule(interval, nil, nil, 0) + err := w.Validate() + So(err, ShouldEqual, ErrInvalidInterval) + }) + }) + Convey("start time in past is ok (as long as window ends in the future)", t, func() { + start := time.Now().Add(time.Second * -10) + stop := time.Now().Add(time.Second * 10) + w := NewWindowedSchedule(time.Millisecond*100, &start, &stop, 0) + err := w.Validate() + So(err, ShouldEqual, nil) + }) + Convey("window in past", t, func() { + start := time.Now().Add(time.Second * -20) + stop := time.Now().Add(time.Second * -10) + w := NewWindowedSchedule(time.Millisecond*100, &start, &stop, 0) + err := w.Validate() + So(err, ShouldEqual, ErrInvalidStopTime) + }) + Convey("cart before the horse", t, func() { + start := time.Now().Add(time.Second * 100) + stop := time.Now().Add(time.Second * 10) + w := NewWindowedSchedule(time.Millisecond*100, &start, &stop, 0) + err := w.Validate() + So(err, ShouldEqual, ErrStopBeforeStart) + }) + Convey("test Wait()", t, func() { + interval := 100 + overage := 467 + shouldWait := float64(500 - overage) + last := time.Now() + + time.Sleep(time.Millisecond * time.Duration(overage)) + s := NewWindowedSchedule(time.Millisecond*time.Duration(interval), nil, nil, 0) + err := s.Validate() + So(err, ShouldBeNil) + + before := time.Now() + r := s.Wait(last) + after := time.Since(before) + + So(r.Error(), ShouldEqual, nil) + So(r.State(), ShouldEqual, Active) + So(r.Missed(), ShouldEqual, 4) + // We are ok at this precision with being within 10% over or under (10ms) + afterMS := after.Nanoseconds() / 1000 / 1000 + So(afterMS, ShouldBeGreaterThan, shouldWait-10) + So(afterMS, ShouldBeLessThan, shouldWait+10) + }) +} diff --git a/pkg/schedule/windowed_schedule_test.go b/pkg/schedule/windowed_schedule_test.go deleted file mode 100644 index a26f673ba..000000000 --- a/pkg/schedule/windowed_schedule_test.go +++ /dev/null @@ -1,303 +0,0 @@ -// +build legacy - -package schedule - -import ( - "testing" - "time" - - log "github.com/Sirupsen/logrus" - - . "github.com/smartystreets/goconvey/convey" -) - -func TestWindowedSchedule(t *testing.T) { - log.SetLevel(log.DebugLevel) - Convey("Windowed Schedule", t, func() { - Convey("nominal window without misses", func() { - startWait := time.Millisecond * 50 - windowSize := time.Millisecond * 200 - interval := time.Millisecond * 10 - - start := time.Now().Add(startWait) - stop := time.Now().Add(startWait + windowSize) - w := NewWindowedSchedule( - interval, - &start, - &stop, - ) - - err := w.Validate() - So(err, ShouldBeNil) - - var r []Response - last := *new(time.Time) - - state := Active - before := time.Now() - for state == Active { - r1 := w.Wait(last) - state = r1.State() - last = time.Now() - r = append(r, r1) - } - - // there are 0 missed responses, so for this schedule - // we expect to get between 19 - 22 responses - So(len(r), ShouldBeBetweenOrEqual, 19, 22) - - var missed uint - for _, x := range r { - missed += x.Missed() - } - So(missed, ShouldEqual, 0) - - // the task is expected to fire immediately on determined start-time - So( - r[0].LastTime().Sub(before).Seconds(), - ShouldBeBetweenOrEqual, - (startWait).Seconds(), - (startWait + interval).Seconds(), - ) - }) - - Convey("nominal window with a few misses", func() { - startWait := time.Millisecond * 50 - windowSize := time.Millisecond * 200 - interval := time.Millisecond * 10 - - start := time.Now().Add(startWait) - stop := time.Now().Add(startWait + windowSize) - w := NewWindowedSchedule( - interval, - &start, - &stop, - ) - - err := w.Validate() - So(err, ShouldBeNil) - - var r []Response - last := *new(time.Time) - - state := Active - before := time.Now() - for state == Active { - r1 := w.Wait(last) - state = r1.State() - last = time.Now() - r = append(r, r1) - // make it miss some - if len(r) == 3 || len(r) == 7 { - time.Sleep(w.Interval) - } - if len(r) == 9 { - // Miss two - time.Sleep(w.Interval * 2) - } - } - - // there are 4 missed responses, so for this schedule - // we expect to get between 15 - 18 responses - So(len(r), ShouldBeBetweenOrEqual, 15, 18) - - var missed uint - for _, x := range r { - missed += x.Missed() - } - So(missed, ShouldEqual, 4) - - // the task is expected to fire immediately on determined start-time - So( - r[0].LastTime().Sub(before).Seconds(), - ShouldBeBetweenOrEqual, - (startWait).Seconds(), - (startWait + interval).Seconds(), - ) - }) - - Convey("started in the past", func() { - startWait := time.Millisecond * -200 - windowSize := time.Millisecond * 400 - interval := time.Millisecond * 10 - - start := time.Now().Add(startWait) - stop := time.Now().Add(startWait + windowSize) - w := NewWindowedSchedule( - interval, - &start, - &stop, - ) - - err := w.Validate() - So(err, ShouldBeNil) - - var r []Response - last := *new(time.Time) - - before := time.Now() - state := Active - for state == Active { - r1 := w.Wait(last) - state = r1.State() - last = time.Now() - r = append(r, r1) - // make it miss some - if len(r) == 3 || len(r) == 7 { - time.Sleep(w.Interval) - } - if len(r) == 9 { - // Miss two - time.Sleep(w.Interval * 2) - } - } - // there are 4 missed responses, so for this schedule - // we expect to get between 15 - 18 responses - So(len(r), ShouldBeBetweenOrEqual, 15, 18) - - var missed uint - for _, x := range r { - missed += x.Missed() - } - So(missed, ShouldEqual, 4) - - // start_time points to the past, - // so the task is expected to fire immediately - So( - r[0].LastTime().Sub(before).Seconds(), - ShouldBeBetweenOrEqual, - 0, - (interval).Seconds(), - ) - }) - - Convey("start without stop", func() { - startWait := time.Millisecond * 50 - interval := time.Millisecond * 10 - - start := time.Now().Add(startWait) - w := NewWindowedSchedule( - interval, - &start, - nil, - ) - - err := w.Validate() - So(err, ShouldBeNil) - - var r []Response - last := *new(time.Time) - - before := time.Now() - for len(r) <= 10 { - r1 := w.Wait(last) - last = time.Now() - r = append(r, r1) - } - - // the task is expected to fire immediately on start_time - So( - r[0].LastTime().Sub(before).Seconds(), - ShouldBeBetweenOrEqual, - (startWait).Seconds(), - (startWait + interval).Seconds(), - ) - }) - - Convey("stop without start", func() { - windowSize := time.Millisecond * 200 - interval := time.Millisecond * 10 - - stop := time.Now().Add(windowSize) - w := NewWindowedSchedule( - interval, - nil, - &stop, - ) - - err := w.Validate() - So(err, ShouldBeNil) - - var r []Response - last := *new(time.Time) - - before := time.Now() - for len(r) <= 10 { - r1 := w.Wait(last) - last = time.Now() - r = append(r, r1) - } - - // the task should start immediately - So( - r[0].LastTime().Sub(before).Seconds(), - ShouldBeBetweenOrEqual, - 0, - (interval).Seconds(), - ) - - var missed uint - for _, x := range r { - missed += x.Missed() - } - So(missed, ShouldEqual, 0) - }) - - Convey("start immediatelly without stop (no window determined)", func() { - interval := time.Millisecond * 10 - // schedule equivalent to simple schedule - w := NewWindowedSchedule( - interval, - nil, - nil, - ) - - err := w.Validate() - So(err, ShouldBeNil) - - var r []Response - last := *new(time.Time) - - before := time.Now() - for len(r) <= 10 { - r1 := w.Wait(last) - last = time.Now() - r = append(r, r1) - } - - // the task should start immediately - So( - r[0].LastTime().Sub(before).Seconds(), - ShouldBeBetweenOrEqual, - 0, - (interval).Seconds(), - ) - - }) - - Convey("start time in past is ok (as long as window ends in the future)", func() { - start := time.Now().Add(time.Second * -10) - stop := time.Now().Add(time.Second * 10) - w := NewWindowedSchedule(time.Millisecond*100, &start, &stop) - err := w.Validate() - So(err, ShouldEqual, nil) - }) - - Convey("window in past", func() { - start := time.Now().Add(time.Second * -20) - stop := time.Now().Add(time.Second * -10) - w := NewWindowedSchedule(time.Millisecond*100, &start, &stop) - err := w.Validate() - So(err, ShouldEqual, ErrInvalidStopTime) - }) - - Convey("cart before the horse", func() { - start := time.Now().Add(time.Second * 100) - stop := time.Now().Add(time.Second * 10) - w := NewWindowedSchedule(time.Millisecond*100, &start, &stop) - err := w.Validate() - So(err, ShouldEqual, ErrStopBeforeStart) - }) - - }) -} diff --git a/pkg/stringutils/string.go b/pkg/stringutils/string.go index 14a9e72f6..ef7242981 100644 --- a/pkg/stringutils/string.go +++ b/pkg/stringutils/string.go @@ -1,3 +1,22 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package stringutils import "fmt" diff --git a/scheduler/distributed_task_test.go b/scheduler/distributed_task_test.go index 1c68ca652..3638095d2 100644 --- a/scheduler/distributed_task_test.go +++ b/scheduler/distributed_task_test.go @@ -68,9 +68,7 @@ func TestDistributedWorkflow(t *testing.T) { mock2Path := helper.PluginFilePath("snap-plugin-collector-mock2") passthruPath := helper.PluginFilePath("snap-plugin-processor-passthru") filePath := helper.PluginFilePath("snap-plugin-publisher-mock-file") - // mock2 and file onto c1 - rp, err := core.NewRequestedPlugin(mock2Path, c1.GetTempDir(), nil) So(err, ShouldBeNil) _, err = c1.Load(rp) @@ -89,7 +87,9 @@ func TestDistributedWorkflow(t *testing.T) { //Create a task //Create a workflowmap wf := dsWFMap(port1) - t, errs := sch.CreateTask(schedule.NewSimpleSchedule(time.Second), wf, true) + // create a simple schedule (equals to windowed schedule without determined start and stop timestamp) + s := schedule.NewWindowedSchedule(time.Second, nil, nil, 0) + t, errs := sch.CreateTask(s, wf, true) So(len(errs.Errors()), ShouldEqual, 0) So(t, ShouldNotBeNil) // stop the scheduler and control (since in nested Convey statements, the @@ -103,7 +103,9 @@ func TestDistributedWorkflow(t *testing.T) { Convey("Test task with invalid remote port", func() { wf := dsWFMap(0) controlproxy.MAX_CONNECTION_TIMEOUT = 1 * time.Second - t, errs := sch.CreateTask(schedule.NewSimpleSchedule(time.Second), wf, true) + // create a simple schedule (equals to windowed schedule without determined start and stop timestamp) + s := schedule.NewWindowedSchedule(time.Second, nil, nil, 0) + t, errs := sch.CreateTask(s, wf, true) So(len(errs.Errors()), ShouldEqual, 1) So(t, ShouldBeNil) // stop the scheduler and control (since in nested Convey statements, the @@ -118,7 +120,9 @@ func TestDistributedWorkflow(t *testing.T) { _, err := c2.Unload(passthru) So(err, ShouldBeNil) wf := dsWFMap(port1) - t, errs := sch.CreateTask(schedule.NewSimpleSchedule(time.Second), wf, true) + // create a simple schedule (equals to windowed schedule without determined start and stop timestamp) + s := schedule.NewWindowedSchedule(time.Second, nil, nil, 0) + t, errs := sch.CreateTask(s, wf, true) So(len(errs.Errors()), ShouldEqual, 1) So(t, ShouldBeNil) // stop the scheduler and control (since in nested Convey statements, the @@ -136,7 +140,8 @@ func TestDistributedWorkflow(t *testing.T) { // define an interval that the simple scheduler will run on every 100ms interval := time.Millisecond * 100 // create our task; should be disabled after 3 failures - t, errs := sch.CreateTask(schedule.NewSimpleSchedule(interval), wf, true) + s := schedule.NewWindowedSchedule(interval, nil, nil, 0) + t, errs := sch.CreateTask(s, wf, true) // ensure task was created successfully So(len(errs.Errors()), ShouldEqual, 0) So(t, ShouldNotBeNil) @@ -198,10 +203,10 @@ func TestDistributedSubscriptions(t *testing.T) { port1 := cfg.ListenPort c2 := control.New(cfg) schcfg := GetDefaultConfig() - sch := New(schcfg) + s := New(schcfg) c2.Start() - sch.SetMetricManager(c1) - err := sch.Start() + s.SetMetricManager(c1) + err := s.Start() So(err, ShouldBeNil) // Load appropriate plugins into each control. mock2Path := helper.PluginFilePath("snap-plugin-collector-mock2") @@ -223,13 +228,15 @@ func TestDistributedSubscriptions(t *testing.T) { _, err = c2.Load(rp) So(err, ShouldBeNil) + // Create a workflowmap + wf := dsWFMap(port1) + Convey("Starting task should not succeed if remote dep fails to subscribe", func() { - //Create a task - //Create a workflowmap - wf := dsWFMap(port1) + // Create a simple schedule which equals to windowed schedule without start and stop time + sch := schedule.NewWindowedSchedule(time.Second, nil, nil, 0) // Create a task that is not started immediately so we can // validate deps correctly. - t, errs := sch.CreateTask(schedule.NewSimpleSchedule(time.Second), wf, false) + t, errs := s.CreateTask(sch, wf, false) So(len(errs.Errors()), ShouldEqual, 0) So(t, ShouldNotBeNil) schTask := t.(*task) @@ -238,24 +245,22 @@ func TestDistributedSubscriptions(t *testing.T) { localMockManager := &subscriptionManager{Fail: false} schTask.RemoteManagers.Add("", localMockManager) // Start task. We expect it to fail while subscribing deps - terrs := sch.StartTask(t.ID()) + terrs := s.StartTask(t.ID()) So(terrs, ShouldNotBeNil) Convey("So dependencies should have been unsubscribed", func() { // Ensure that unsubscribe call count is equal to subscribe call count // i.e that every subscribe call was followed by an unsubscribe since // we errored So(remoteMockManager.UnsubscribeCallCount, ShouldEqual, remoteMockManager.SubscribeCallCount) - So(localMockManager.UnsubscribeCallCount, ShouldEqual, localMockManager.UnsubscribeCallCount) + So(localMockManager.UnsubscribeCallCount, ShouldEqual, localMockManager.SubscribeCallCount) }) }) - Convey("Starting task should not succeed if missing local dep fails to subscribe", func() { - //Create a task - //Create a workflowmap - wf := dsWFMap(port1) + // create a simple schedule which equals to windowed schedule without start and stop time + sch := schedule.NewWindowedSchedule(time.Second, nil, nil, 0) // Create a task that is not started immediately so we can // validate deps correctly. - t, errs := sch.CreateTask(schedule.NewSimpleSchedule(time.Second), wf, false) + t, errs := s.CreateTask(sch, wf, false) So(len(errs.Errors()), ShouldEqual, 0) So(t, ShouldNotBeNil) schTask := t.(*task) @@ -265,39 +270,109 @@ func TestDistributedSubscriptions(t *testing.T) { schTask.RemoteManagers.Add(fmt.Sprintf("127.0.0.1:%v", port1), remoteMockManager) // Start task. We expect it to fail while subscribing deps - terrs := sch.StartTask(t.ID()) + terrs := s.StartTask(t.ID()) So(terrs, ShouldNotBeNil) Convey("So dependencies should have been unsubscribed", func() { // Ensure that unsubscribe call count is equal to subscribe call count // i.e that every subscribe call was followed by an unsubscribe since // we errored So(remoteMockManager.UnsubscribeCallCount, ShouldEqual, remoteMockManager.SubscribeCallCount) - So(localMockManager.UnsubscribeCallCount, ShouldEqual, localMockManager.UnsubscribeCallCount) + So(localMockManager.UnsubscribeCallCount, ShouldEqual, localMockManager.SubscribeCallCount) }) }) - Convey("Starting task should succeed if all deps are available", func() { - //Create a task - //Create a workflowmap - wf := dsWFMap(port1) - // Create a task that is not started immediately so we can - // validate deps correctly. - t, errs := sch.CreateTask(schedule.NewSimpleSchedule(time.Second), wf, false) - So(len(errs.Errors()), ShouldEqual, 0) - So(t, ShouldNotBeNil) - schTask := t.(*task) - localMockManager := &subscriptionManager{Fail: false} - schTask.RemoteManagers.Add("", localMockManager) - remoteMockManager := &subscriptionManager{Fail: false} - schTask.RemoteManagers.Add(fmt.Sprintf("127.0.0.1:%v", port1), remoteMockManager) - terrs := sch.StartTask(t.ID()) - So(terrs, ShouldBeNil) - Convey("So all depndencies should have been subscribed to", func() { - // Ensure that unsubscribe call count is equal to subscribe call count - // i.e that every subscribe call was followed by an unsubscribe since - // we errored - So(localMockManager.SubscribeCallCount, ShouldBeGreaterThan, 0) - So(remoteMockManager.SubscribeCallCount, ShouldBeGreaterThan, 0) + Convey("Task is expected to run until being stopped", func() { + sch := schedule.NewWindowedSchedule(time.Second, nil, nil, 0) + // Create a task that is not started immediately so we can + // validate deps correctly. + t, errs := s.CreateTask(sch, wf, false) + So(len(errs.Errors()), ShouldEqual, 0) + So(t, ShouldNotBeNil) + schTask := t.(*task) + localMockManager := &subscriptionManager{Fail: false} + schTask.RemoteManagers.Add("", localMockManager) + remoteMockManager := &subscriptionManager{Fail: false} + schTask.RemoteManagers.Add(fmt.Sprintf("127.0.0.1:%v", port1), remoteMockManager) + terrs := s.StartTask(t.ID()) + So(terrs, ShouldBeNil) + + Convey("So all dependencies should have been subscribed to", func() { + So(localMockManager.SubscribeCallCount, ShouldBeGreaterThan, 0) + So(remoteMockManager.SubscribeCallCount, ShouldBeGreaterThan, 0) + }) + }) + Convey("Single run task", func() { + count := uint(1) + interval := time.Millisecond * 100 + sch := schedule.NewWindowedSchedule(interval, nil, nil, count) + // Create a task that is not started immediately so we can + // validate deps correctly. + t, errs := s.CreateTask(sch, wf, false) + So(len(errs.Errors()), ShouldEqual, 0) + So(t, ShouldNotBeNil) + schTask := t.(*task) + localMockManager := &subscriptionManager{Fail: false} + schTask.RemoteManagers.Add("", localMockManager) + remoteMockManager := &subscriptionManager{Fail: false} + schTask.RemoteManagers.Add(fmt.Sprintf("127.0.0.1:%v", port1), remoteMockManager) + terrs := s.StartTask(t.ID()) + So(terrs, ShouldBeNil) + + Convey("So all dependencies should have been subscribed to", func() { + So(localMockManager.SubscribeCallCount, ShouldBeGreaterThan, 0) + So(remoteMockManager.SubscribeCallCount, ShouldBeGreaterThan, 0) + }) + Convey("Task should be ended after an interval", func() { + // wait for the end of the task + // we are ok at this precision with being within 10% over the interval (100ms) + time.Sleep(interval * 110 / 100) + So(t.State(), ShouldEqual, core.TaskEnded) + + Convey("So all dependencies should have been usubscribed", func() { + So(remoteMockManager.UnsubscribeCallCount, ShouldEqual, remoteMockManager.SubscribeCallCount) + So(localMockManager.UnsubscribeCallCount, ShouldEqual, localMockManager.SubscribeCallCount) + }) + }) + }) + Convey("Task is expected to run until reaching determined stop time", func() { + startWait := time.Millisecond * 50 + windowSize := time.Millisecond * 500 + interval := time.Millisecond * 100 + + start := time.Now().Add(startWait) + stop := time.Now().Add(startWait + windowSize) + sch := schedule.NewWindowedSchedule(interval, &start, &stop, 0) + + // Create a task that is not started immediately so we can + // validate deps correctly. + t, errs := s.CreateTask(sch, wf, false) + So(len(errs.Errors()), ShouldEqual, 0) + So(t, ShouldNotBeNil) + schTask := t.(*task) + localMockManager := &subscriptionManager{Fail: false} + schTask.RemoteManagers.Add("", localMockManager) + remoteMockManager := &subscriptionManager{Fail: false} + schTask.RemoteManagers.Add(fmt.Sprintf("127.0.0.1:%v", port1), remoteMockManager) + terrs := s.StartTask(t.ID()) + So(terrs, ShouldBeNil) + + Convey("So all dependencies should have been subscribed to", func() { + So(localMockManager.SubscribeCallCount, ShouldBeGreaterThan, 0) + So(remoteMockManager.SubscribeCallCount, ShouldBeGreaterThan, 0) + }) + Convey("Task should have been ended after reaching the end of window", func() { + // wait for the end of determined window + time.Sleep(startWait + windowSize) + // wait an interval to be sure that the task state has been updated + time.Sleep(interval) + // check if the task has ended + So(t.State(), ShouldEqual, core.TaskEnded) + + Convey("So all dependencies should have been usubscribed", func() { + So(remoteMockManager.UnsubscribeCallCount, ShouldEqual, remoteMockManager.SubscribeCallCount) + So(localMockManager.UnsubscribeCallCount, ShouldEqual, localMockManager.SubscribeCallCount) + }) + }) }) }) }) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index fc1325bbd..fad01cb12 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -500,35 +500,9 @@ func (s *scheduler) startTask(id, source string) []serror.SnapError { return errs } - // Group dependencies by the node they live on - // and subscribe to them. - depGroups := getWorkflowPlugins(t.workflow.processNodes, t.workflow.publishNodes, t.workflow.metrics) - var subbedDeps []string - for k := range depGroups { - var errs []serror.SnapError - mgr, err := t.RemoteManagers.Get(k) - if err != nil { - errs = append(errs, serror.New(err)) - } else { - errs = mgr.SubscribeDeps(t.ID(), depGroups[k].requestedMetrics, depGroups[k].subscribedPlugins, t.workflow.configTree) - } - // If there are errors with subscribing any deps, go through and unsubscribe all other - // deps that may have already been subscribed then return the errors. - if len(errs) > 0 { - for _, key := range subbedDeps { - mgr, err := t.RemoteManagers.Get(key) - if err != nil { - errs = append(errs, serror.New(err)) - } else { - // sending empty mts to unsubscribe to indicate task should not start - uerrs := mgr.UnsubscribeDeps(t.ID()) - errs = append(errs, uerrs...) - } - } - return errs - } - // If subscribed successfully add to subbedDeps - subbedDeps = append(subbedDeps, k) + // subscribe plugins to task + if _, err := t.SubscribePlugins(); len(err) != 0 { + return err } event := &scheduler_event.TaskStartedEvent{ @@ -595,36 +569,21 @@ func (s *scheduler) stopTask(id, source string) []serror.SnapError { serror.New(ErrTaskDisabledNotStoppable), } default: - // Group dependencies by the host they live on and - // unsubscribe them since task is stopping. - depGroups := getWorkflowPlugins(t.workflow.processNodes, t.workflow.publishNodes, t.workflow.metrics) - var errs []serror.SnapError - for k := range depGroups { - mgr, err := t.RemoteManagers.Get(k) - if err != nil { - errs = append(errs, serror.New(err)) - } else { - uerrs := mgr.UnsubscribeDeps(t.ID()) - if len(uerrs) > 0 { - errs = append(errs, uerrs...) - } - } - if len(errs) > 0 { - return errs - } - event := &scheduler_event.TaskStoppedEvent{ - TaskID: t.ID(), - Source: source, - } - defer s.eventManager.Emit(event) - t.Stop() - logger.WithFields(log.Fields{ - "task-id": t.ID(), - "task-state": t.State(), - }).Info("task stopped") + if errs := t.UnsubscribePlugins(); len(errs) != 0 { + return errs + } + event := &scheduler_event.TaskStoppedEvent{ + TaskID: t.ID(), + Source: source, } + defer s.eventManager.Emit(event) + t.Stop() + logger.WithFields(log.Fields{ + "task-id": t.ID(), + "task-state": t.State(), + }).Info("task stopped") } return nil @@ -802,6 +761,9 @@ func (s *scheduler) HandleGomitEvent(e gomit.Event) { "event-namespace": e.Namespace(), "task-id": v.TaskID, }).Debug("event received") + // We need to unsubscribe from deps when a task has ended + task, _ := s.getTask(v.TaskID) + task.UnsubscribePlugins() s.taskWatcherColl.handleTaskEnded(v.TaskID) case *scheduler_event.TaskDisabledEvent: log.WithFields(log.Fields{ @@ -813,13 +775,7 @@ func (s *scheduler) HandleGomitEvent(e gomit.Event) { }).Debug("event received") // We need to unsubscribe from deps when a task goes disabled task, _ := s.getTask(v.TaskID) - depGroups := getWorkflowPlugins(task.workflow.processNodes, task.workflow.publishNodes, task.workflow.metrics) - for k := range depGroups { - mgr, err := task.RemoteManagers.Get(k) - if err == nil { - mgr.UnsubscribeDeps(task.ID()) - } - } + task.UnsubscribePlugins() s.taskWatcherColl.handleTaskDisabled(v.TaskID, v.Why) default: log.WithFields(log.Fields{ diff --git a/scheduler/scheduler_medium_test.go b/scheduler/scheduler_medium_test.go index 0fb08a7b0..af6d43727 100644 --- a/scheduler/scheduler_medium_test.go +++ b/scheduler/scheduler_medium_test.go @@ -199,13 +199,19 @@ func TestCreateTask(t *testing.T) { Convey("returns an error when the schedule does not validate", func() { Convey("the interval is invalid", func() { Convey("the interval equals zero", func() { - tsk, errs := s.CreateTask(schedule.NewSimpleSchedule(0), w, false) + invalidInterval := 0 * time.Millisecond + // create a simple schedule which equals to windowed schedule + // without start and stop time + sch := schedule.NewWindowedSchedule(invalidInterval, nil, nil, 0) + tsk, errs := s.CreateTask(sch, w, false) So(errs, ShouldNotBeEmpty) So(tsk, ShouldBeNil) So(errs.Errors()[0].Error(), ShouldEqual, schedule.ErrInvalidInterval.Error()) }) Convey("the interval is less than zero", func() { - tsk, errs := s.CreateTask(schedule.NewSimpleSchedule((-1)*time.Millisecond), w, false) + invalidInterval := (-1) * time.Millisecond + sch := schedule.NewWindowedSchedule(invalidInterval, nil, nil, 0) + tsk, errs := s.CreateTask(sch, w, false) So(errs, ShouldNotBeEmpty) So(tsk, ShouldBeNil) So(errs.Errors()[0].Error(), ShouldEqual, schedule.ErrInvalidInterval.Error()) @@ -213,7 +219,10 @@ func TestCreateTask(t *testing.T) { }) }) Convey("should not error when the schedule is valid", func() { - tsk, errs := s.CreateTask(schedule.NewSimpleSchedule(interval), w, false) + // create a simple schedule which equals to windowed schedule + // without start and stop time + sch := schedule.NewWindowedSchedule(interval, nil, nil, 0) + tsk, errs := s.CreateTask(sch, w, false) So(errs.Errors(), ShouldBeEmpty) So(tsk, ShouldNotBeNil) }) @@ -221,27 +230,11 @@ func TestCreateTask(t *testing.T) { Convey("Calling CreateTask for a windowed schedule", t, func() { Convey("returns an error when the schedule does not validate", func() { - Convey("the interval is invalid", func() { - start := time.Now().Add(startWait) - stop := time.Now().Add(startWait + windowSize) - - Convey("the interval equals zero", func() { - tsk, errs := s.CreateTask(schedule.NewWindowedSchedule(0, &start, &stop), w, false) - So(errs, ShouldNotBeEmpty) - So(tsk, ShouldBeNil) - So(errs.Errors()[0].Error(), ShouldEqual, schedule.ErrInvalidInterval.Error()) - }) - Convey("the interval is less than zero", func() { - tsk, errs := s.CreateTask(schedule.NewWindowedSchedule((-1)*time.Millisecond, &start, &stop), w, false) - So(errs, ShouldNotBeEmpty) - So(tsk, ShouldBeNil) - So(errs.Errors()[0].Error(), ShouldEqual, schedule.ErrInvalidInterval.Error()) - }) - }) Convey("the stop time was set in the past", func() { start := time.Now().Add(startWait) stop := time.Now().Add(time.Second * -10) - tsk, errs := s.CreateTask(schedule.NewWindowedSchedule(interval, &start, &stop), w, false) + sch := schedule.NewWindowedSchedule(interval, &start, &stop, 0) + tsk, errs := s.CreateTask(sch, w, false) So(errs, ShouldNotBeEmpty) So(tsk, ShouldBeNil) So(errs.Errors()[0].Error(), ShouldEqual, schedule.ErrInvalidStopTime.Error()) @@ -249,7 +242,8 @@ func TestCreateTask(t *testing.T) { Convey("the stop time is before the start time", func() { start := time.Now().Add(startWait * 2) stop := time.Now().Add(startWait) - tsk, errs := s.CreateTask(schedule.NewWindowedSchedule(interval, &start, &stop), w, false) + sch := schedule.NewWindowedSchedule(interval, &start, &stop, 0) + tsk, errs := s.CreateTask(sch, w, false) So(errs, ShouldNotBeEmpty) So(tsk, ShouldBeNil) So(errs.Errors()[0].Error(), ShouldEqual, schedule.ErrStopBeforeStart.Error()) @@ -258,7 +252,8 @@ func TestCreateTask(t *testing.T) { Convey("should not error when the schedule is valid", func() { start := time.Now().Add(startWait) stop := time.Now().Add(startWait + windowSize) - tsk, errs := s.CreateTask(schedule.NewWindowedSchedule(interval, &start, &stop), w, false) + sch := schedule.NewWindowedSchedule(interval, &start, &stop, 0) + tsk, errs := s.CreateTask(sch, w, false) So(errs.Errors(), ShouldBeEmpty) So(tsk, ShouldNotBeNil) @@ -275,6 +270,46 @@ func TestCreateTask(t *testing.T) { }) }) //end of tests for a windowed scheduler + Convey("Calling CreateTask for a simple/windowed schedule with determined the count of runs", t, func() { + Convey("Single run task firing immediately", func() { + sch := schedule.NewWindowedSchedule(interval, nil, nil, 1) + tsk, errs := s.CreateTask(sch, w, false) + So(errs.Errors(), ShouldBeEmpty) + So(tsk, ShouldNotBeNil) + + task := s.tasks.Get(tsk.ID()) + task.Spin() + + Convey("the task should be ended after reaching the end of window", func() { + // wait an interval to be sure that the task state has been updated + // we are ok at this precision with being within 10% over the interval (10ms) + time.Sleep(interval * 110 / 100) + // check if the task is ended + So(tsk.State(), ShouldEqual, core.TaskEnded) + }) + }) + Convey("Single run task firing on defined start time", func() { + count := uint(1) + start := time.Now().Add(startWait) + sch := schedule.NewWindowedSchedule(interval, &start, nil, count) + tsk, errs := s.CreateTask(sch, w, false) + So(errs.Errors(), ShouldBeEmpty) + So(tsk, ShouldNotBeNil) + + task := s.tasks.Get(tsk.ID()) + task.Spin() + Convey("the task should be ended after reaching the end of window", func() { + // wait for the end of determined window + time.Sleep(startWait) + // wait an interval to be sure that the task state has been updated + // we are ok at this precision with being within 10% over the interval (10ms) + time.Sleep(interval * 110 / 100) + // check if the task is ended + So(tsk.State(), ShouldEqual, core.TaskEnded) + }) + }) + }) //end of tests for simple/windowed schedule with determined the count + Convey("Calling CreateTask for a cron schedule", t, func() { Convey("returns an error when the schedule does not validate", func() { Convey("the cron entry is empty", func() { @@ -310,7 +345,8 @@ func TestStopTask(t *testing.T) { w := newMockWorkflowMap() Convey("Calling StopTask on a running task", t, func() { - tsk, _ := s.CreateTask(schedule.NewSimpleSchedule(interval), w, false) + sch := schedule.NewWindowedSchedule(interval, nil, nil, 0) + tsk, _ := s.CreateTask(sch, w, false) So(tsk, ShouldNotBeNil) task := s.tasks.Get(tsk.ID()) task.Spin() @@ -328,7 +364,8 @@ func TestStopTask(t *testing.T) { }) }) Convey("Calling StopTask on a stopped task", t, func() { - tskStopped, _ := s.CreateTask(schedule.NewSimpleSchedule(interval), w, false) + sch := schedule.NewWindowedSchedule(interval, nil, nil, 0) + tskStopped, _ := s.CreateTask(sch, w, false) So(tskStopped, ShouldNotBeNil) // check if the task is already stopped So(tskStopped.State(), ShouldEqual, core.TaskStopped) @@ -343,7 +380,8 @@ func TestStopTask(t *testing.T) { }) }) Convey("Calling StopTask on a disabled task", t, func() { - tskDisabled, _ := s.CreateTask(schedule.NewSimpleSchedule(interval), w, false) + sch := schedule.NewWindowedSchedule(interval, nil, nil, 0) + tskDisabled, _ := s.CreateTask(sch, w, false) So(tskDisabled, ShouldNotBeNil) taskDisabled := s.tasks.Get(tskDisabled.ID()) taskDisabled.state = core.TaskDisabled @@ -365,7 +403,8 @@ func TestStopTask(t *testing.T) { stop := time.Now().Add(startWait + windowSize) // create a task with windowed schedule - tsk, errs := s.CreateTask(schedule.NewWindowedSchedule(interval, &start, &stop), w, false) + sch := schedule.NewWindowedSchedule(interval, &start, &stop, 0) + tsk, errs := s.CreateTask(sch, w, false) So(errs.Errors(), ShouldBeEmpty) So(tsk, ShouldNotBeNil) @@ -403,7 +442,8 @@ func TestStartTask(t *testing.T) { w := newMockWorkflowMap() Convey("Calling StartTask a running task", t, func() { - tsk, _ := s.CreateTask(schedule.NewSimpleSchedule(interval), w, false) + sch := schedule.NewWindowedSchedule(interval, nil, nil, 0) + tsk, _ := s.CreateTask(sch, w, false) So(tsk, ShouldNotBeNil) task := s.tasks.Get(tsk.ID()) @@ -426,7 +466,8 @@ func TestStartTask(t *testing.T) { task.Stop() }) Convey("Calling StartTask on a disabled task", t, func() { - tskDisabled, _ := s.CreateTask(schedule.NewSimpleSchedule(interval), w, false) + sch := schedule.NewWindowedSchedule(interval, nil, nil, 0) + tskDisabled, _ := s.CreateTask(sch, w, false) So(tskDisabled, ShouldNotBeNil) taskDisabled := s.tasks.Get(tskDisabled.ID()) taskDisabled.state = core.TaskDisabled @@ -448,7 +489,8 @@ func TestStartTask(t *testing.T) { stop := time.Now().Add(startWait + windowSize) //create a task with windowed schedule - tsk, errs := s.CreateTask(schedule.NewWindowedSchedule(interval, &start, &stop), w, false) + sch := schedule.NewWindowedSchedule(interval, &start, &stop, 0) + tsk, errs := s.CreateTask(sch, w, false) So(errs.Errors(), ShouldBeEmpty) So(tsk, ShouldNotBeNil) @@ -487,7 +529,8 @@ func TestEnableTask(t *testing.T) { w := newMockWorkflowMap() Convey("Calling EnableTask on a disabled task", t, func() { - tskDisabled, _ := s.CreateTask(schedule.NewSimpleSchedule(interval), w, false) + sch := schedule.NewWindowedSchedule(interval, nil, nil, 0) + tskDisabled, _ := s.CreateTask(sch, w, false) So(tskDisabled, ShouldNotBeNil) taskDisabled := s.tasks.Get(tskDisabled.ID()) taskDisabled.state = core.TaskDisabled @@ -504,7 +547,8 @@ func TestEnableTask(t *testing.T) { }) }) Convey("Calling EnableTask on a running task", t, func() { - tsk, _ := s.CreateTask(schedule.NewSimpleSchedule(interval), w, false) + sch := schedule.NewWindowedSchedule(interval, nil, nil, 0) + tsk, _ := s.CreateTask(sch, w, false) So(tsk, ShouldNotBeNil) task := s.tasks.Get(tsk.ID()) task.Spin() @@ -521,7 +565,8 @@ func TestEnableTask(t *testing.T) { }) }) Convey("Calling EnableTask on a stopped task", t, func() { - tskStopped, _ := s.CreateTask(schedule.NewSimpleSchedule(interval), w, false) + sch := schedule.NewWindowedSchedule(interval, nil, nil, 0) + tskStopped, _ := s.CreateTask(sch, w, false) So(tskStopped, ShouldNotBeNil) // check if the task is already stopped So(tskStopped.State(), ShouldEqual, core.TaskStopped) @@ -539,8 +584,9 @@ func TestEnableTask(t *testing.T) { start := time.Now().Add(startWait) stop := time.Now().Add(startWait + windowSize) - //create a task with windowed schedule - tsk, errs := s.CreateTask(schedule.NewWindowedSchedule(interval, &start, &stop), w, false) + // create a task with windowed schedule + sch := schedule.NewWindowedSchedule(interval, &start, &stop, 0) + tsk, errs := s.CreateTask(sch, w, false) So(errs.Errors(), ShouldBeEmpty) So(tsk, ShouldNotBeNil) diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index 576b57d18..6ca0df189 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -198,20 +198,22 @@ func TestScheduler(t *testing.T) { e := s.Start() So(e, ShouldBeNil) - _, te := s.CreateTask(schedule.NewSimpleSchedule(time.Second*1), w, false) + // create a simple schedule which equals to windowed schedule without start and stop time + _, te := s.CreateTask(schedule.NewWindowedSchedule(time.Second, nil, nil, 0), w, false) So(te.Errors(), ShouldBeEmpty) Convey("returns errors when metrics do not validate", func() { c.failValidatingMetrics = true c.failValidatingMetricsAfter = 1 - _, err := s.CreateTask(schedule.NewSimpleSchedule(time.Second*1), w, false) + // create a simple schedule which equals to windowed schedule without start and stop time + sch := schedule.NewWindowedSchedule(time.Second, nil, nil, 0) + _, err := s.CreateTask(sch, w, false) So(err, ShouldNotBeNil) fmt.Printf("%d", len(err.Errors())) So(len(err.Errors()), ShouldBeGreaterThan, 0) So(err.Errors()[0], ShouldResemble, serror.New(errors.New("metric validation error"))) }) - Convey("returns an error when scheduler started and MetricManager is not set", func() { s1 := New(GetDefaultConfig()) err := s1.Start() @@ -220,25 +222,23 @@ func TestScheduler(t *testing.T) { So(err, ShouldResemble, ErrMetricManagerNotSet) }) - - // TODO NICK Convey("returns an error when a schedule does not validate", func() { s1 := New(GetDefaultConfig()) s1.Start() - _, err := s1.CreateTask(schedule.NewSimpleSchedule(time.Second*1), w, false) + sch := schedule.NewWindowedSchedule(time.Second, nil, nil, 0) + _, err := s1.CreateTask(sch, w, false) So(err, ShouldNotBeNil) So(len(err.Errors()), ShouldBeGreaterThan, 0) So(err.Errors()[0], ShouldResemble, serror.New(ErrSchedulerNotStarted)) s1.metricManager = c s1.Start() - _, err1 := s1.CreateTask(schedule.NewSimpleSchedule(time.Second*0), w, false) + _, err1 := s1.CreateTask(schedule.NewWindowedSchedule(time.Second*0, nil, nil, 0), w, false) So(err1.Errors()[0].Error(), ShouldResemble, "Interval must be greater than 0") }) - - // // TODO NICK Convey("create a task", func() { - tsk, err := s.CreateTask(schedule.NewSimpleSchedule(time.Second*5), w, false) + sch := schedule.NewWindowedSchedule(time.Second*5, nil, nil, 0) + tsk, err := s.CreateTask(sch, w, false) So(len(err.Errors()), ShouldEqual, 0) So(tsk, ShouldNotBeNil) So(tsk.(*task).deadlineDuration, ShouldResemble, DefaultDeadlineDuration) @@ -264,10 +264,9 @@ func TestScheduler(t *testing.T) { So(err[0].Error(), ShouldEqual, "Task is already stopped.") }) }) - - // // // TODO NICK Convey("returns a task with a 6 second deadline duration", func() { - tsk, err := s.CreateTask(schedule.NewSimpleSchedule(time.Second*6), w, false, core.TaskDeadlineDuration(6*time.Second)) + sch := schedule.NewWindowedSchedule(6*time.Second, nil, nil, 0) + tsk, err := s.CreateTask(sch, w, false, core.TaskDeadlineDuration(6*time.Second)) So(len(err.Errors()), ShouldEqual, 0) So(tsk.(*task).deadlineDuration, ShouldResemble, time.Duration(6*time.Second)) prev := tsk.(*task).Option(core.TaskDeadlineDuration(1 * time.Second)) @@ -275,13 +274,11 @@ func TestScheduler(t *testing.T) { tsk.(*task).Option(prev) So(tsk.(*task).deadlineDuration, ShouldResemble, time.Duration(6*time.Second)) }) - Convey("returns a task with a 1m collectDuration", func() { tsk, err := s.CreateTask(schedule.NewStreamingSchedule(), w, false, core.SetMaxCollectDuration(time.Minute)) So(len(err.Errors()), ShouldEqual, 0) So(tsk.(*task).maxCollectDuration, ShouldResemble, time.Duration(time.Minute)) }) - Convey("Returns a task with a metric buffer of 100", func() { tsk, err := s.CreateTask(schedule.NewStreamingSchedule(), w, false, core.SetMaxMetricsBuffer(100)) So(len(err.Errors()), ShouldEqual, 0) diff --git a/scheduler/task.go b/scheduler/task.go index d5f4cdbb0..899e73b51 100644 --- a/scheduler/task.go +++ b/scheduler/task.go @@ -33,6 +33,7 @@ import ( "github.com/intelsdi-x/snap/core" "github.com/intelsdi-x/snap/core/scheduler_event" + "github.com/intelsdi-x/snap/core/serror" "github.com/intelsdi-x/snap/grpc/controlproxy" "github.com/intelsdi-x/snap/pkg/schedule" "github.com/intelsdi-x/snap/scheduler/wmap" @@ -254,7 +255,7 @@ func (t *task) Spin() { // misses for the interval while stopped. t.lastFireTime = time.Time{} - if t.state == core.TaskStopped { + if t.state == core.TaskStopped || t.state == core.TaskEnded { t.state = core.TaskSpinning t.killChan = make(chan struct{}) // spin in a goroutine @@ -360,6 +361,60 @@ func (t *task) Stop() { } } +// UnsubscribePlugins groups task dependencies by the node they live in workflow and unsubscribe them +func (t *task) UnsubscribePlugins() []serror.SnapError { + depGroups := getWorkflowPlugins(t.workflow.processNodes, t.workflow.publishNodes, t.workflow.metrics) + var errs []serror.SnapError + for k := range depGroups { + mgr, err := t.RemoteManagers.Get(k) + if err != nil { + errs = append(errs, serror.New(err)) + } else { + uerrs := mgr.UnsubscribeDeps(t.ID()) + if len(uerrs) > 0 { + errs = append(errs, uerrs...) + } + } + } + return errs +} + +// SubscribePlugins groups task dependencies by the node they live in workflow and subscribe them. +// If there are errors with subscribing any deps, manage unsubscribing all other deps that may have already been subscribed +// and then return the errors. +func (t *task) SubscribePlugins() ([]string, []serror.SnapError) { + depGroups := getWorkflowPlugins(t.workflow.processNodes, t.workflow.publishNodes, t.workflow.metrics) + var subbedDeps []string + for k := range depGroups { + var errs []serror.SnapError + mgr, err := t.RemoteManagers.Get(k) + if err != nil { + errs = append(errs, serror.New(err)) + } else { + errs = mgr.SubscribeDeps(t.ID(), depGroups[k].requestedMetrics, depGroups[k].subscribedPlugins, t.workflow.configTree) + } + // If there are errors with subscribing any deps, go through and unsubscribe all other + // deps that may have already been subscribed then return the errors. + if len(errs) > 0 { + for _, key := range subbedDeps { + mgr, err := t.RemoteManagers.Get(key) + if err != nil { + errs = append(errs, serror.New(err)) + } else { + // sending empty mts to unsubscribe to indicate task should not start + uerrs := mgr.UnsubscribeDeps(t.ID()) + errs = append(errs, uerrs...) + } + } + return nil, errs + } + // If subscribed successfully add to subbedDeps + subbedDeps = append(subbedDeps, k) + } + + return subbedDeps, nil +} + //Enable changes the state from Disabled to Stopped func (t *task) Enable() error { t.Lock() @@ -402,7 +457,7 @@ func (t *task) spin() { select { case sr := <-t.schResponseChan: switch sr.State() { - // If response show this schedule is stil active we fire + // If response show this schedule is still active we fire case schedule.Active: t.missedIntervals += sr.Missed() t.lastFireTime = time.Now() @@ -447,6 +502,10 @@ func (t *task) spin() { t.Lock() t.state = core.TaskEnded t.Unlock() + // Send task ended event + event := new(scheduler_event.TaskEndedEvent) + event.TaskID = t.id + defer t.eventEmitter.Emit(event) return //spin // Schedule has errored diff --git a/scheduler/task_test.go b/scheduler/task_test.go index 9e0cdfa9d..93f1697b1 100644 --- a/scheduler/task_test.go +++ b/scheduler/task_test.go @@ -46,7 +46,8 @@ func TestTask(t *testing.T) { So(errs, ShouldBeEmpty) c := &mockMetricManager{} Convey("task + simple schedule", func() { - sch := schedule.NewSimpleSchedule(time.Millisecond * 100) + // create a simple schedule which equals to windowed schedule without start and stop time + sch := schedule.NewWindowedSchedule(time.Millisecond*100, nil, nil, 0) task, err := newTask(sch, wf, newWorkManager(), c, emitter) So(err, ShouldBeNil) task.Spin() @@ -57,7 +58,7 @@ func TestTask(t *testing.T) { }) Convey("Task specified-name test", func() { - sch := schedule.NewSimpleSchedule(time.Millisecond * 100) + sch := schedule.NewWindowedSchedule(time.Millisecond*100, nil, nil, 0) task, err := newTask(sch, wf, newWorkManager(), c, emitter, core.SetTaskName("My name is unique")) So(err, ShouldBeNil) task.Spin() @@ -65,7 +66,7 @@ func TestTask(t *testing.T) { }) Convey("Task default-name test", func() { - sch := schedule.NewSimpleSchedule(time.Millisecond * 100) + sch := schedule.NewWindowedSchedule(time.Millisecond*100, nil, nil, 0) task, err := newTask(sch, wf, newWorkManager(), c, emitter) So(err, ShouldBeNil) task.Spin() @@ -74,7 +75,7 @@ func TestTask(t *testing.T) { }) Convey("Task deadline duration test", func() { - sch := schedule.NewSimpleSchedule(time.Millisecond * 100) + sch := schedule.NewWindowedSchedule(time.Millisecond*100, nil, nil, 0) task, err := newTask(sch, wf, newWorkManager(), c, emitter, core.TaskDeadlineDuration(20*time.Second)) So(err, ShouldBeNil) task.Spin() @@ -86,7 +87,7 @@ func TestTask(t *testing.T) { }) Convey("Tasks are created and creation of task table is checked", func() { - sch := schedule.NewSimpleSchedule(time.Millisecond * 100) + sch := schedule.NewWindowedSchedule(time.Millisecond*100, nil, nil, 0) task, err := newTask(sch, wf, newWorkManager(), c, emitter) So(err, ShouldBeNil) task1, err := newTask(sch, wf, newWorkManager(), c, emitter) @@ -103,7 +104,7 @@ func TestTask(t *testing.T) { }) Convey("Task is created and starts to spin", func() { - sch := schedule.NewSimpleSchedule(time.Second * 5) + sch := schedule.NewWindowedSchedule(time.Second*5, nil, nil, 0) task, err := newTask(sch, wf, newWorkManager(), c, emitter) So(err, ShouldBeNil) task.Spin() @@ -116,7 +117,7 @@ func TestTask(t *testing.T) { }) Convey("task fires", func() { - sch := schedule.NewSimpleSchedule(time.Nanosecond * 100) + sch := schedule.NewWindowedSchedule(time.Nanosecond*100, nil, nil, 0) task, err := newTask(sch, wf, newWorkManager(), c, emitter) So(err, ShouldBeNil) task.Spin() @@ -127,7 +128,7 @@ func TestTask(t *testing.T) { }) Convey("Enable a running task", func() { - sch := schedule.NewSimpleSchedule(time.Millisecond * 10) + sch := schedule.NewWindowedSchedule(time.Millisecond*10, nil, nil, 0) task, err := newTask(sch, wf, newWorkManager(), c, emitter) So(err, ShouldBeNil) task.Spin() @@ -137,7 +138,7 @@ func TestTask(t *testing.T) { }) Convey("Enable a disabled task", func() { - sch := schedule.NewSimpleSchedule(time.Millisecond * 10) + sch := schedule.NewWindowedSchedule(time.Millisecond*10, nil, nil, 0) task, err := newTask(sch, wf, newWorkManager(), c, emitter) So(err, ShouldBeNil) @@ -153,7 +154,7 @@ func TestTask(t *testing.T) { wf, errs := wmapToWorkflow(sampleWFMap) So(errs, ShouldBeEmpty) - sch := schedule.NewSimpleSchedule(time.Millisecond * 10) + sch := schedule.NewWindowedSchedule(time.Millisecond*10, nil, nil, 0) task, err := newTask(sch, wf, newWorkManager(), &mockMetricManager{}, emitter) So(err, ShouldBeNil) So(task.id, ShouldNotBeEmpty) diff --git a/scheduler/workflow_test.go b/scheduler/workflow_test.go index 3b63f4fc4..97e7d2f33 100644 --- a/scheduler/workflow_test.go +++ b/scheduler/workflow_test.go @@ -179,7 +179,10 @@ func TestCollectPublishWorkflow(t *testing.T) { Convey("Create and start task", func() { el := newEventListener() s.RegisterEventHandler("TestCollectPublishWorkflow", el) - t, err := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*200), w, true) + // create a simple schedule which equals to windowed schedule + // without start and stop time + sch := schedule.NewWindowedSchedule(time.Millisecond*200, nil, nil, 0) + t, err := s.CreateTask(sch, w, true) So(err.Errors(), ShouldBeEmpty) So(t, ShouldNotBeNil) <-el.done @@ -242,7 +245,10 @@ func TestProcessChainingWorkflow(t *testing.T) { err := s.Start() So(err, ShouldBeNil) Convey("Create task", func() { - t, err := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*200), w, true) + // create a simple schedule which equals to windowed schedule + // without start and stop time + sch := schedule.NewWindowedSchedule(time.Millisecond*200, nil, nil, 0) + t, err := s.CreateTask(sch, w, true) s.RegisterEventHandler("TestProcessChainingWorkflow", lpe) So(err.Errors(), ShouldBeEmpty) So(t, ShouldNotBeNil)