diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 14ef4350d..3c72c3598 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -123,6 +123,10 @@ { "ImportPath": "github.com/vrischmann/jsonutil", "Rev": "694784f9315ee9fc763c1d30f28753cba21307aa" + }, + { + "ImportPath": "github.com/robfig/cron", + "Rev": "32d9c273155a0506d27cf73dd1246e86a470997e" } ] } diff --git a/cmd/snapctl/Godeps/Godeps.json b/cmd/snapctl/Godeps/Godeps.json index c8f7dfeff..9e8c62679 100644 --- a/cmd/snapctl/Godeps/Godeps.json +++ b/cmd/snapctl/Godeps/Godeps.json @@ -14,6 +14,10 @@ { "ImportPath": "github.com/ghodss/yaml", "Rev": "c3eb24aeea63668ebdac08d2e252f20df8b6b1ae" + }, + { + "ImportPath": "github.com/robfig/cron", + "Rev": "32d9c273155a0506d27cf73dd1246e86a470997e" } ] } diff --git a/cmd/snapctl/flags.go b/cmd/snapctl/flags.go index 4be027e8d..b164eef2e 100644 --- a/cmd/snapctl/flags.go +++ b/cmd/snapctl/flags.go @@ -2,7 +2,7 @@ http://www.apache.org/licenses/LICENSE-2.0.txt -Copyright 2015 Intel Corporation +Copyright 2015,2016 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -90,7 +90,7 @@ var ( flTaskSchedInterval = cli.StringFlag{ Name: "interval, i", - Usage: "Interval for the task schedule [ex: 250ms, 1s, 30m]", + Usage: "Interval for the task schedule [ex (simple schedule): 250ms, 1s, 30m (cron schedule): \"0 * * * * *\"]", } flTaskSchedStartTime = cli.StringFlag{ diff --git a/cmd/snapctl/task.go b/cmd/snapctl/task.go index 8c3faf855..0b4d8e66e 100644 --- a/cmd/snapctl/task.go +++ b/cmd/snapctl/task.go @@ -2,7 +2,7 @@ http://www.apache.org/licenses/LICENSE-2.0.txt -Copyright 2015 Intel Corporation +Copyright 2015,2016 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ import ( "github.com/codegangsta/cli" "github.com/intelsdi-x/snap/mgmt/rest/client" "github.com/intelsdi-x/snap/scheduler/wmap" + "github.com/robfig/cron" "github.com/ghodss/yaml" ) @@ -192,11 +193,17 @@ func createTaskUsingWFManifest(ctx *cli.Context) { // Get the task name name := ctx.String("name") // Get the interval + isCron := false i := ctx.String("interval") _, err := time.ParseDuration(i) if err != nil { - fmt.Printf("Bad interval format:\n%v\n", err) - os.Exit(1) + // try interpreting interval as cron entry + _, e := cron.Parse(i) + if e != nil { + fmt.Printf("Bad interval format:\nfor simple schedule: %v\nfor cron schedule: %v\n", err, e) + os.Exit(1) + } + isCron = true } // Deadline for a task @@ -206,7 +213,7 @@ func createTaskUsingWFManifest(ctx *cli.Context) { // None of these mean it is a simple schedule if !ctx.IsSet("start-date") && !ctx.IsSet("start-time") && !ctx.IsSet("stop-date") && !ctx.IsSet("stop-time") { // Check if duration was set - if ctx.IsSet("duration") { + if ctx.IsSet("duration") && !isCron { d, err := time.ParseDuration(ctx.String("duration")) if err != nil { fmt.Printf("Bad duration format:\n%v\n", err) @@ -222,8 +229,13 @@ func createTaskUsingWFManifest(ctx *cli.Context) { } } else { // No start or stop and no duration == simple schedule + t := "simple" + if isCron { + // It's a cron schedule, ignore "duration" if set + t = "cron" + } sch = &client.Schedule{ - Type: "simple", + Type: t, Interval: i, } } diff --git a/docs/SNAPCTL.md b/docs/SNAPCTL.md index 4bd9569a7..ed1a6239b 100644 --- a/docs/SNAPCTL.md +++ b/docs/SNAPCTL.md @@ -53,7 +53,7 @@ create There are two ways to create a task. --task-manifest, -t File path for task manifest to use for task creation. --workflow-manifest, -w File path for workflow manifest to use for task creation - --interval, -i Interval for the task schedule [ex: 250ms, 1s, 30m] + --interval, -i Interval for the task schedule [ex (simple schedule): 250ms, 1s, 30m (cron schedule): "0 * * * * *"] --start-date Start date for the task schedule [defaults to today] --start-time Start time for the task schedule [defaults to now] --stop-date Stop date for the task schedule [defaults to today] @@ -127,4 +127,4 @@ $ $SNAP_PATH/bin/snapctl task list $ $SNAP_PATH/bin/snapctl plugin unload -t collector -n mock -v $ $SNAP_PATH/bin/snapctl plugin unload -t processor -n passthru -v $ $SNAP_PATH/bin/snapctl plugin unload -t publisher -n publisher -v -``` \ No newline at end of file +``` diff --git a/docs/TASKS.md b/docs/TASKS.md index 43f2b5d79..5278a22e9 100644 --- a/docs/TASKS.md +++ b/docs/TASKS.md @@ -22,7 +22,20 @@ The header contains a version, used to differentiate between versions of the tas #### Schedule -The schedule describes the schedule type and interval for running the task. The type of a schedule could be a simple "run forever" schedule, which is what we see above as `"simple"` or something more complex. __snap__ is designed in a way where custom schedulers can easily be dropped in. If a custom schedule is used, it may require more key/value pairs in the schedule section of the manifest. At the time of this writing, __snap__ has a simple schedule which is described above, and a window schedule. The window schedule adds a start and stop time. For more on tasks, visit [`SNAPCTL.md`](SNAPCTL.md). +The schedule describes the schedule type and interval for running the task. The type of a schedule could be a simple "run forever" schedule, which is what we see above as `"simple"` or something more complex. __snap__ is designed in a way where custom schedulers can easily be dropped in. If a custom schedule is used, it may require more key/value pairs in the schedule section of the manifest. At the time of this writing, __snap__ has three schedules: +- **simple schedule** which is described above, +- **window schedule** which adds a start and stop time, +- **cron schedule** which supports cron-like entries in ```interval``` field, like in this example (workflow will fire every hour on the half hour): +``` + "version": 1, + "schedule": { + "type": "cron", + "interval" : "0 30 * * * *" + }, +``` +More on cron expressions can be found here: https://godoc.org/github.com/robfig/cron + +For more on tasks, visit [`SNAPCTL.md`](SNAPCTL.md). ### The Workflow diff --git a/mgmt/rest/client/task.go b/mgmt/rest/client/task.go index d96c0c99c..07589820c 100644 --- a/mgmt/rest/client/task.go +++ b/mgmt/rest/client/task.go @@ -2,7 +2,7 @@ http://www.apache.org/licenses/LICENSE-2.0.txt -Copyright 2015 Intel Corporation +Copyright 2015,2016 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -34,7 +34,7 @@ import ( ) type Schedule struct { - // Type specifies the type of the schedule. Currently,the type of "simple" and "windowed" are supported. + // Type specifies the type of the schedule. Currently, the type of "simple", "windowed" and "cron" are supported. Type string // Interval specifies the time duration. Interval string diff --git a/mgmt/rest/task.go b/mgmt/rest/task.go index 571966bc6..707249562 100644 --- a/mgmt/rest/task.go +++ b/mgmt/rest/task.go @@ -2,7 +2,7 @@ http://www.apache.org/licenses/LICENSE-2.0.txt -Copyright 2015 Intel Corporation +Copyright 2015,2016 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -346,6 +346,17 @@ func makeSchedule(s request.Schedule) (cschedule.Schedule, error) { return nil, err } return sch, nil + case "cron": + if s.Interval == "" { + return nil, errors.New("missing cron entry ") + } + sch := cschedule.NewCronSchedule(s.Interval) + + err := sch.Validate() + if err != nil { + return nil, err + } + return sch, nil default: return nil, errors.New("unknown schedule type " + s.Type) } diff --git a/pkg/schedule/cron_schedule.go b/pkg/schedule/cron_schedule.go new file mode 100644 index 000000000..6826bdcfc --- /dev/null +++ b/pkg/schedule/cron_schedule.go @@ -0,0 +1,137 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + +Copyright 2016 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedule + +import ( + "errors" + "time" + + "github.com/robfig/cron" +) + +// ErrMissingCronEntry indicates missing cron entry +var ErrMissingCronEntry = errors.New("Cron entry is missing") + +// CronSchedule is a schedule that waits as long as specified in cron entry +type CronSchedule struct { + entry string + enabled bool + state ScheduleState + schedule *cron.Cron +} + +// NewCronSchedule creates and starts new cron schedule and returns an instance of CronSchedule +func NewCronSchedule(entry string) *CronSchedule { + schedule := cron.New() + return &CronSchedule{ + entry: entry, + schedule: schedule, + enabled: false, + } +} + +// GetState returns state of CronSchedule +func (c *CronSchedule) GetState() ScheduleState { + return c.state +} + +// Validate returns error if cron entry dosn't match crontab format +func (c *CronSchedule) Validate() error { + if c.entry == "" { + return ErrMissingCronEntry + } + _, err := cron.Parse(c.entry) + if err != nil { + return err + } + return nil +} + +// Wait waits as long as specified in cron entry +func (c *CronSchedule) Wait(last time.Time) Response { + var err error + now := time.Now() + + // first run + if (last == time.Time{}) { + last = now + } + // schedule not enabled, either due to first run or invalid cron entry + if !c.enabled { + err = c.schedule.AddFunc(c.entry, func() {}) + if err != nil { + c.state = Error + } else { + c.enabled = true + } + } + + var misses uint + if c.enabled { + s := c.schedule.Entries()[0].Schedule + + // calculate misses + for next := last; next.Before(now); { + next = s.Next(next) + if next.After(now) { + break + } + misses++ + } + + // wait + waitTime := s.Next(now) + time.Sleep(waitTime.Sub(now)) + } + + return &CronScheduleResponse{ + state: c.GetState(), + err: err, + missed: misses, + lastTime: time.Now(), + } +} + +// CronScheduleResponse is the response from CronSchedule +type CronScheduleResponse struct { + state ScheduleState + err error + missed uint + lastTime time.Time +} + +// State returns the state of the Schedule +func (c *CronScheduleResponse) State() ScheduleState { + return c.state +} + +// Error returns last error +func (c *CronScheduleResponse) Error() error { + return c.err +} + +// Missed returns any missed intervals +func (c *CronScheduleResponse) Missed() uint { + return c.missed +} + +// LastTime returns the last response time +func (c *CronScheduleResponse) LastTime() time.Time { + return c.lastTime +} diff --git a/pkg/schedule/cron_schedule_test.go b/pkg/schedule/cron_schedule_test.go new file mode 100644 index 000000000..477b6cd91 --- /dev/null +++ b/pkg/schedule/cron_schedule_test.go @@ -0,0 +1,77 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + +Copyright 2016 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedule + +import ( + "testing" + "time" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestCronSchedule(t *testing.T) { + Convey("Cron Schedule", t, func() { + Convey("valid cron entry", func() { + i := "0 * * * * *" + c := NewCronSchedule(i) + e := c.Validate() + So(e, ShouldBeNil) + }) + Convey("missing cron entry", func() { + i := "" + c := NewCronSchedule(i) + e := c.Validate() + So(e, ShouldEqual, ErrMissingCronEntry) + }) + Convey("invalid cron entry", func() { + i := "invalid cron entry" + c := NewCronSchedule(i) + e := c.Validate() + So(e, ShouldNotBeNil) + }) + Convey("wait on valid cron entry", func() { + i := "@every 1s" + c := NewCronSchedule(i) + now := time.Now() + r := c.Wait(now) + So(r, ShouldNotBeNil) + So(r.State(), ShouldEqual, Active) + So(r.Error(), ShouldBeNil) + So(r.Missed(), ShouldEqual, 0) + lastTime := r.LastTime() + l := lastTime.After(now) && lastTime.Before(time.Now()) + So(l, ShouldBeTrue) + }) + Convey("counting misses in Wait()", func() { + i := "@every 1s" + c := NewCronSchedule(i) + now := time.Now() + r := c.Wait(now) + then := now.Add(-time.Duration(10) * time.Second) + r = c.Wait(then) + So(r, ShouldNotBeNil) + So(r.State(), ShouldEqual, Active) + So(r.Error(), ShouldBeNil) + So(r.Missed(), ShouldBeBetweenOrEqual, 10, 12) + lastTime := r.LastTime() + l := lastTime.After(now) && lastTime.Before(time.Now()) + So(l, ShouldBeTrue) + }) + }) +}