Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Feature #818 - Cron scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
andrzej-k committed Apr 5, 2016
1 parent 2c573a1 commit 8f8c549
Show file tree
Hide file tree
Showing 10 changed files with 271 additions and 13 deletions.
4 changes: 4 additions & 0 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions cmd/snapctl/Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions cmd/snapctl/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
http://www.apache.org/licenses/LICENSE-2.0.txt
Copyright 2015 Intel Corporation
Copyright 2015,2016 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -90,7 +90,7 @@ var (

flTaskSchedInterval = cli.StringFlag{
Name: "interval, i",
Usage: "Interval for the task schedule [ex: 250ms, 1s, 30m]",
Usage: "Interval for the task schedule [ex (simple schedule): 250ms, 1s, 30m (cron schedule): \"0 * * * * *\"]",
}

flTaskSchedStartTime = cli.StringFlag{
Expand Down
22 changes: 17 additions & 5 deletions cmd/snapctl/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
http://www.apache.org/licenses/LICENSE-2.0.txt
Copyright 2015 Intel Corporation
Copyright 2015,2016 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,7 @@ import (
"github.com/codegangsta/cli"
"github.com/intelsdi-x/snap/mgmt/rest/client"
"github.com/intelsdi-x/snap/scheduler/wmap"
"github.com/robfig/cron"

"github.com/ghodss/yaml"
)
Expand Down Expand Up @@ -192,11 +193,17 @@ func createTaskUsingWFManifest(ctx *cli.Context) {
// Get the task name
name := ctx.String("name")
// Get the interval
isCron := false
i := ctx.String("interval")
_, err := time.ParseDuration(i)
if err != nil {
fmt.Printf("Bad interval format:\n%v\n", err)
os.Exit(1)
// try interpreting interval as cron entry
_, e := cron.Parse(i)
if e != nil {
fmt.Printf("Bad interval format:\nfor simple schedule: %v\nfor cron schedule: %v\n", err, e)
os.Exit(1)
}
isCron = true
}

// Deadline for a task
Expand All @@ -206,7 +213,7 @@ func createTaskUsingWFManifest(ctx *cli.Context) {
// None of these mean it is a simple schedule
if !ctx.IsSet("start-date") && !ctx.IsSet("start-time") && !ctx.IsSet("stop-date") && !ctx.IsSet("stop-time") {
// Check if duration was set
if ctx.IsSet("duration") {
if ctx.IsSet("duration") && !isCron {
d, err := time.ParseDuration(ctx.String("duration"))
if err != nil {
fmt.Printf("Bad duration format:\n%v\n", err)
Expand All @@ -222,8 +229,13 @@ func createTaskUsingWFManifest(ctx *cli.Context) {
}
} else {
// No start or stop and no duration == simple schedule
t := "simple"
if isCron {
// It's a cron schedule, ignore "duration" if set
t = "cron"
}
sch = &client.Schedule{
Type: "simple",
Type: t,
Interval: i,
}
}
Expand Down
4 changes: 2 additions & 2 deletions docs/SNAPCTL.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ create There are two ways to create a task.
--task-manifest, -t File path for task manifest to use for task creation.
--workflow-manifest, -w File path for workflow manifest to use for task creation
--interval, -i Interval for the task schedule [ex: 250ms, 1s, 30m]
--interval, -i Interval for the task schedule [ex (simple schedule): 250ms, 1s, 30m (cron schedule): "0 * * * * *"]
--start-date Start date for the task schedule [defaults to today]
--start-time Start time for the task schedule [defaults to now]
--stop-date Stop date for the task schedule [defaults to today]
Expand Down Expand Up @@ -127,4 +127,4 @@ $ $SNAP_PATH/bin/snapctl task list
$ $SNAP_PATH/bin/snapctl plugin unload -t collector -n mock -v <version>
$ $SNAP_PATH/bin/snapctl plugin unload -t processor -n passthru -v <version>
$ $SNAP_PATH/bin/snapctl plugin unload -t publisher -n publisher -v <version>
```
```
15 changes: 14 additions & 1 deletion docs/TASKS.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,20 @@ The header contains a version, used to differentiate between versions of the tas

#### Schedule

The schedule describes the schedule type and interval for running the task. The type of a schedule could be a simple "run forever" schedule, which is what we see above as `"simple"` or something more complex. __snap__ is designed in a way where custom schedulers can easily be dropped in. If a custom schedule is used, it may require more key/value pairs in the schedule section of the manifest. At the time of this writing, __snap__ has a simple schedule which is described above, and a window schedule. The window schedule adds a start and stop time. For more on tasks, visit [`SNAPCTL.md`](SNAPCTL.md).
The schedule describes the schedule type and interval for running the task. The type of a schedule could be a simple "run forever" schedule, which is what we see above as `"simple"` or something more complex. __snap__ is designed in a way where custom schedulers can easily be dropped in. If a custom schedule is used, it may require more key/value pairs in the schedule section of the manifest. At the time of this writing, __snap__ has three schedules:
- **simple schedule** which is described above,
- **window schedule** which adds a start and stop time,
- **cron schedule** which supports cron-like entries in ```interval``` field, like in this example (workflow will fire every hour on the half hour):
```
"version": 1,
"schedule": {
"type": "cron",
"interval" : "0 30 * * * *"
},
```
More on cron expressions can be found here: https://godoc.org/github.com/robfig/cron
For more on tasks, visit [`SNAPCTL.md`](SNAPCTL.md).
### The Workflow
Expand Down
4 changes: 2 additions & 2 deletions mgmt/rest/client/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
http://www.apache.org/licenses/LICENSE-2.0.txt
Copyright 2015 Intel Corporation
Copyright 2015,2016 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,7 +34,7 @@ import (
)

type Schedule struct {
// Type specifies the type of the schedule. Currently,the type of "simple" and "windowed" are supported.
// Type specifies the type of the schedule. Currently, the type of "simple", "windowed" and "cron" are supported.
Type string
// Interval specifies the time duration.
Interval string
Expand Down
13 changes: 12 additions & 1 deletion mgmt/rest/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
http://www.apache.org/licenses/LICENSE-2.0.txt
Copyright 2015 Intel Corporation
Copyright 2015,2016 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -346,6 +346,17 @@ func makeSchedule(s request.Schedule) (cschedule.Schedule, error) {
return nil, err
}
return sch, nil
case "cron":
if s.Interval == "" {
return nil, errors.New("missing cron entry ")
}
sch := cschedule.NewCronSchedule(s.Interval)

err := sch.Validate()
if err != nil {
return nil, err
}
return sch, nil
default:
return nil, errors.New("unknown schedule type " + s.Type)
}
Expand Down
137 changes: 137 additions & 0 deletions pkg/schedule/cron_schedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
http://www.apache.org/licenses/LICENSE-2.0.txt
Copyright 2016 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package schedule

import (
"errors"
"time"

"github.com/robfig/cron"
)

// ErrMissingCronEntry indicates missing cron entry
var ErrMissingCronEntry = errors.New("Cron entry is missing")

// CronSchedule is a schedule that waits as long as specified in cron entry
type CronSchedule struct {
entry string
enabled bool
state ScheduleState
schedule *cron.Cron
}

// NewCronSchedule creates and starts new cron schedule and returns an instance of CronSchedule
func NewCronSchedule(entry string) *CronSchedule {
schedule := cron.New()
return &CronSchedule{
entry: entry,
schedule: schedule,
enabled: false,
}
}

// GetState returns state of CronSchedule
func (c *CronSchedule) GetState() ScheduleState {
return c.state
}

// Validate returns error if cron entry dosn't match crontab format
func (c *CronSchedule) Validate() error {
if c.entry == "" {
return ErrMissingCronEntry
}
_, err := cron.Parse(c.entry)
if err != nil {
return err
}
return nil
}

// Wait waits as long as specified in cron entry
func (c *CronSchedule) Wait(last time.Time) Response {
var err error
now := time.Now()

// first run
if (last == time.Time{}) {
last = now
}
// schedule not enabled, either due to first run or invalid cron entry
if !c.enabled {
err = c.schedule.AddFunc(c.entry, func() {})
if err != nil {
c.state = Error
} else {
c.enabled = true
}
}

var misses uint
if c.enabled {
s := c.schedule.Entries()[0].Schedule

// calculate misses
for next := last; next.Before(now); {
next = s.Next(next)
if next.After(now) {
break
}
misses++
}

// wait
waitTime := s.Next(now)
time.Sleep(waitTime.Sub(now))
}

return &CronScheduleResponse{
state: c.GetState(),
err: err,
missed: misses,
lastTime: time.Now(),
}
}

// CronScheduleResponse is the response from CronSchedule
type CronScheduleResponse struct {
state ScheduleState
err error
missed uint
lastTime time.Time
}

// State returns the state of the Schedule
func (c *CronScheduleResponse) State() ScheduleState {
return c.state
}

// Error returns last error
func (c *CronScheduleResponse) Error() error {
return c.err
}

// Missed returns any missed intervals
func (c *CronScheduleResponse) Missed() uint {
return c.missed
}

// LastTime returns the last response time
func (c *CronScheduleResponse) LastTime() time.Time {
return c.lastTime
}
77 changes: 77 additions & 0 deletions pkg/schedule/cron_schedule_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
http://www.apache.org/licenses/LICENSE-2.0.txt
Copyright 2016 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package schedule

import (
"testing"
"time"

. "github.com/smartystreets/goconvey/convey"
)

func TestCronSchedule(t *testing.T) {
Convey("Cron Schedule", t, func() {
Convey("valid cron entry", func() {
i := "0 * * * * *"
c := NewCronSchedule(i)
e := c.Validate()
So(e, ShouldBeNil)
})
Convey("missing cron entry", func() {
i := ""
c := NewCronSchedule(i)
e := c.Validate()
So(e, ShouldEqual, ErrMissingCronEntry)
})
Convey("invalid cron entry", func() {
i := "invalid cron entry"
c := NewCronSchedule(i)
e := c.Validate()
So(e, ShouldNotBeNil)
})
Convey("wait on valid cron entry", func() {
i := "@every 1s"
c := NewCronSchedule(i)
now := time.Now()
r := c.Wait(now)
So(r, ShouldNotBeNil)
So(r.State(), ShouldEqual, Active)
So(r.Error(), ShouldBeNil)
So(r.Missed(), ShouldEqual, 0)
lastTime := r.LastTime()
l := lastTime.After(now) && lastTime.Before(time.Now())
So(l, ShouldBeTrue)
})
Convey("counting misses in Wait()", func() {
i := "@every 1s"
c := NewCronSchedule(i)
now := time.Now()
r := c.Wait(now)
then := now.Add(-time.Duration(10) * time.Second)
r = c.Wait(then)
So(r, ShouldNotBeNil)
So(r.State(), ShouldEqual, Active)
So(r.Error(), ShouldBeNil)
So(r.Missed(), ShouldBeBetweenOrEqual, 10, 12)
lastTime := r.LastTime()
l := lastTime.After(now) && lastTime.Before(time.Now())
So(l, ShouldBeTrue)
})
})
}

0 comments on commit 8f8c549

Please sign in to comment.