From 9677fd5ce72a8e69613b4ffa32cf8daa0b14042d Mon Sep 17 00:00:00 2001 From: Izabella Raulin Date: Mon, 6 Mar 2017 13:19:36 +0100 Subject: [PATCH] Scheduled workflow execution immediately - applicable for simple and windowed schedule, not applicable for cron shedule --- control/strategy/lru.go | 2 +- control/strategy/sticky.go | 2 +- mgmt/rest/client/client_func_test.go | 45 ++++++-- pkg/schedule/schedule.go | 4 +- pkg/schedule/simple_schedule_test.go | 74 +++++++++++++- pkg/schedule/windowed_schedule.go | 17 +--- pkg/schedule/windowed_schedule_test.go | 136 +++++++++++++++++++++---- scheduler/task.go | 3 +- scheduler/watcher.go | 4 +- 9 files changed, 238 insertions(+), 49 deletions(-) diff --git a/control/strategy/lru.go b/control/strategy/lru.go index 85edd7e59..8316bfb36 100644 --- a/control/strategy/lru.go +++ b/control/strategy/lru.go @@ -26,7 +26,7 @@ import ( "github.com/intelsdi-x/snap/core" ) -// lru provides a stragey that selects the least recently used available plugin. +// lru provides a strategy that selects the least recently used available plugin. type lru struct { *cache logger *log.Entry diff --git a/control/strategy/sticky.go b/control/strategy/sticky.go index 0ab8c7c15..2f7fa7c08 100644 --- a/control/strategy/sticky.go +++ b/control/strategy/sticky.go @@ -33,7 +33,7 @@ var ( ErrCacheDoesNotExist = errors.New("cache does not exist") ) -// sticky provides a stragey that ... concurrency count is 1 +// sticky provides a strategy that ... concurrency count is 1 type sticky struct { plugins map[string]AvailablePlugin metricCache map[string]*cache diff --git a/mgmt/rest/client/client_func_test.go b/mgmt/rest/client/client_func_test.go index 1b531a7d2..342d52d5a 100644 --- a/mgmt/rest/client/client_func_test.go +++ b/mgmt/rest/client/client_func_test.go @@ -343,6 +343,25 @@ func TestSnapClient(t *testing.T) { }) }) + Convey("unload one of collector plugin", func() { + p := c.GetPlugins(false) + So(p.Err, ShouldBeNil) + So(len(p.LoadedPlugins), ShouldEqual, 3) + + p2 := c.UnloadPlugin("collector", "mock", 2) + So(p2.Err, ShouldBeNil) + So(p2.Name, ShouldEqual, "mock") + So(p2.Version, ShouldEqual, 2) + So(p2.Type, ShouldEqual, "collector") + + Convey("there should be two loaded plugins", func() { + p = c.GetPlugins(false) + So(p.Err, ShouldBeNil) + So(len(p.LoadedPlugins), ShouldEqual, 2) + So(p.AvailablePlugins, ShouldBeEmpty) + }) + }) + Convey("Tasks", func() { Convey("Passing a bad task manifest", func() { wfb := getWMFromSample("bad.json") @@ -374,6 +393,7 @@ func TestSnapClient(t *testing.T) { correctSchedule := &Schedule{Type: "simple", Interval: "1s"} tt := c.CreateTask(correctSchedule, wf, "baron", "", true, 0) So(tt.Err, ShouldBeNil) + So(tt.State, ShouldEqual, "Running") }) Convey("Creating a task with correct configuration for windowed schedule", func() { @@ -384,17 +404,19 @@ func TestSnapClient(t *testing.T) { StopTimestamp: &stopTime} tt := c.CreateTask(correctSchedule, wf, "baron", "", true, 0) So(tt.Err, ShouldBeNil) + So(tt.State, ShouldEqual, "Running") }) Convey("Creating a task with correct configuration for cron schedule", func() { correctSchedule := &Schedule{Type: "cron", Interval: "1 1 1 1 1 1"} tt := c.CreateTask(correctSchedule, wf, "baron", "", true, 0) So(tt.Err, ShouldBeNil) + So(tt.State, ShouldEqual, "Running") }) }) - tf := c.CreateTask(sch, wf, "baron", "", false, 0) Convey("valid task not started on creation", func() { + tf := c.CreateTask(sch, wf, "baron", "", false, 0) So(tf.Err, ShouldBeNil) So(tf.Name, ShouldEqual, "baron") So(tf.State, ShouldEqual, "Stopped") @@ -436,8 +458,8 @@ func TestSnapClient(t *testing.T) { }) }) - tt := c.CreateTask(sch, wf, "baron", "", true, 0) Convey("valid task started on creation", func() { + tt := c.CreateTask(sch, wf, "baron", "", true, 0) So(tt.Err, ShouldBeNil) So(tt.Name, ShouldEqual, "baron") So(tt.State, ShouldEqual, "Running") @@ -533,6 +555,7 @@ func TestSnapClient(t *testing.T) { a := new(ea) r := c.WatchTask(tf.ID) + So(r.Err, ShouldBeNil) wait := make(chan struct{}) go func() { for { @@ -554,10 +577,11 @@ func TestSnapClient(t *testing.T) { So(startResp.Err, ShouldBeNil) <-wait a.Lock() + defer a.Unlock() + So(len(a.events), ShouldEqual, 5) - a.Unlock() So(a.events[0], ShouldEqual, "task-started") - for x := 2; x <= 4; x++ { + for x := 1; x < 5; x++ { So(a.events[x], ShouldEqual, "metric-event") } }) @@ -570,16 +594,15 @@ func TestSnapClient(t *testing.T) { So(p.Err, ShouldNotBeNil) So(p.Err.Error(), ShouldEqual, "plugin not found") }) + Convey("unload already unloaded plugin", func() { + p := c.UnloadPlugin("collector", "mock", 2) + So(p.Err, ShouldNotBeNil) + So(p.Err.Error(), ShouldEqual, "plugin not found") + }) Convey("unload one of multiple", func() { p1 := c.GetPlugins(false) So(p1.Err, ShouldBeNil) - So(len(p1.LoadedPlugins), ShouldEqual, 3) - - p2 := c.UnloadPlugin("collector", "mock", 2) - So(p2.Err, ShouldBeNil) - So(p2.Name, ShouldEqual, "mock") - So(p2.Version, ShouldEqual, 2) - So(p2.Type, ShouldEqual, "collector") + So(len(p1.LoadedPlugins), ShouldEqual, 2) p3 := c.UnloadPlugin("publisher", "mock-file", 3) So(p3.Err, ShouldBeNil) diff --git a/pkg/schedule/schedule.go b/pkg/schedule/schedule.go index dbd390d8b..0e29e0b1c 100644 --- a/pkg/schedule/schedule.go +++ b/pkg/schedule/schedule.go @@ -49,8 +49,10 @@ type Response interface { } func waitOnInterval(last time.Time, i time.Duration) (uint, time.Time) { + // first run if (last == time.Time{}) { - time.Sleep(i) + // for the first run, do not wait on interval + // and schedule workflow execution immediately return uint(0), time.Now() } // Get the difference in time.Duration since last in nanoseconds (int64) diff --git a/pkg/schedule/simple_schedule_test.go b/pkg/schedule/simple_schedule_test.go index cb3670db6..8abaa3020 100644 --- a/pkg/schedule/simple_schedule_test.go +++ b/pkg/schedule/simple_schedule_test.go @@ -26,20 +26,88 @@ func TestSimpleSchedule(t *testing.T) { r := s.Wait(last) after := time.Since(before) - So(r.State(), ShouldEqual, Active) - So(r.Missed(), ShouldResemble, uint(4)) So(r.Error(), ShouldEqual, nil) + So(r.State(), ShouldEqual, Active) + So(r.Missed(), ShouldEqual, 4) // We are ok at this precision with being within 10% over or under (10ms) afterMS := after.Nanoseconds() / 1000 / 1000 So(afterMS, ShouldBeGreaterThan, shouldWait-10) So(afterMS, ShouldBeLessThan, shouldWait+10) }) - Convey("invalid schedule", func() { s := NewSimpleSchedule(0) err := s.Validate() So(err, ShouldResemble, ErrInvalidInterval) }) + }) + Convey("Simple schedule with no misses", t, func() { + interval := time.Millisecond * 10 + s := NewSimpleSchedule(interval) + + err := s.Validate() + So(err, ShouldBeNil) + + var r []Response + last := *new(time.Time) + + before := time.Now() + for len(r) <= 10 { + r1 := s.Wait(last) + last = time.Now() + r = append(r, r1) + } + + var missed uint + for _, x := range r { + missed += x.Missed() + } + So(missed, ShouldEqual, 0) + + // the task should start immediately + So( + r[0].LastTime().Sub(before).Seconds(), + ShouldBeBetweenOrEqual, + 0, + (interval).Seconds(), + ) + }) + Convey("Simple schedule with a few misses", t, func() { + interval := time.Millisecond * 10 + s := NewSimpleSchedule(interval) + + err := s.Validate() + So(err, ShouldBeNil) + + var r []Response + last := *new(time.Time) + + before := time.Now() + for len(r) <= 10 { + r1 := s.Wait(last) + last = time.Now() + r = append(r, r1) + // make it miss some + if len(r) == 3 || len(r) == 7 { + time.Sleep(s.Interval) + } + if len(r) == 9 { + // Miss two + time.Sleep(s.Interval * 2) + } + } + + var missed uint + for _, x := range r { + missed += x.Missed() + } + So(missed, ShouldEqual, 4) + // the task should fire immediately + So( + r[0].LastTime().Sub(before).Seconds(), + ShouldBeBetweenOrEqual, + 0, + (interval).Seconds(), + ) }) } diff --git a/pkg/schedule/windowed_schedule.go b/pkg/schedule/windowed_schedule.go index 024b68cd9..582a84350 100644 --- a/pkg/schedule/windowed_schedule.go +++ b/pkg/schedule/windowed_schedule.go @@ -66,19 +66,12 @@ func (w *WindowedSchedule) Wait(last time.Time) Response { }).Debug("Waiting for window to start") time.Sleep(wait) } - if (last == time.Time{}) { - logger.WithFields(log.Fields{ - "_block": "windowed-wait", - }).Debug("Last was unset using start time") - last = *w.StartTime - } } else { - if (last == time.Time{}) { - logger.WithFields(log.Fields{ - "_block": "windowed-wait", - }).Debug("Last was unset using start time") - last = time.Now() - } + // This has no start like a simple schedule, so execution starts immediately + logger.WithFields(log.Fields{ + "_block": "windowed-wait", + "sleep-duration": 0, + }).Debug("Window start time not defined, start execution immediately") } // If within the window we wait our interval and return diff --git a/pkg/schedule/windowed_schedule_test.go b/pkg/schedule/windowed_schedule_test.go index eff13e52b..a26f673ba 100644 --- a/pkg/schedule/windowed_schedule_test.go +++ b/pkg/schedule/windowed_schedule_test.go @@ -14,11 +14,57 @@ import ( func TestWindowedSchedule(t *testing.T) { log.SetLevel(log.DebugLevel) Convey("Windowed Schedule", t, func() { + Convey("nominal window without misses", func() { + startWait := time.Millisecond * 50 + windowSize := time.Millisecond * 200 + interval := time.Millisecond * 10 + + start := time.Now().Add(startWait) + stop := time.Now().Add(startWait + windowSize) + w := NewWindowedSchedule( + interval, + &start, + &stop, + ) + + err := w.Validate() + So(err, ShouldBeNil) + + var r []Response + last := *new(time.Time) + + state := Active + before := time.Now() + for state == Active { + r1 := w.Wait(last) + state = r1.State() + last = time.Now() + r = append(r, r1) + } + + // there are 0 missed responses, so for this schedule + // we expect to get between 19 - 22 responses + So(len(r), ShouldBeBetweenOrEqual, 19, 22) + + var missed uint + for _, x := range r { + missed += x.Missed() + } + So(missed, ShouldEqual, 0) + + // the task is expected to fire immediately on determined start-time + So( + r[0].LastTime().Sub(before).Seconds(), + ShouldBeBetweenOrEqual, + (startWait).Seconds(), + (startWait + interval).Seconds(), + ) + }) + Convey("nominal window with a few misses", func() { startWait := time.Millisecond * 50 windowSize := time.Millisecond * 200 interval := time.Millisecond * 10 - // shouldWait := 1000.0 + float64(interval) start := time.Now().Add(startWait) stop := time.Now().Add(startWait + windowSize) @@ -50,26 +96,30 @@ func TestWindowedSchedule(t *testing.T) { time.Sleep(w.Interval * 2) } } - // we should have either 16 or 17 minus 3 missed - So(len(r), ShouldBeBetweenOrEqual, 15, 17) + + // there are 4 missed responses, so for this schedule + // we expect to get between 15 - 18 responses + So(len(r), ShouldBeBetweenOrEqual, 15, 18) var missed uint for _, x := range r { missed += x.Missed() } + So(missed, ShouldEqual, 4) + + // the task is expected to fire immediately on determined start-time So( r[0].LastTime().Sub(before).Seconds(), ShouldBeBetweenOrEqual, - (startWait+interval).Seconds()*.9, - (startWait+interval).Seconds()*1.5, + (startWait).Seconds(), + (startWait + interval).Seconds(), ) }) - Convey("started in the middle of the window", func() { + Convey("started in the past", func() { startWait := time.Millisecond * -200 - windowSize := time.Millisecond * 350 + windowSize := time.Millisecond * 400 interval := time.Millisecond * 10 - // shouldWait := 1000.0 + float64(interval) start := time.Now().Add(startWait) stop := time.Now().Add(startWait + windowSize) @@ -85,6 +135,7 @@ func TestWindowedSchedule(t *testing.T) { var r []Response last := *new(time.Time) + before := time.Now() state := Active for state == Active { r1 := w.Wait(last) @@ -100,14 +151,24 @@ func TestWindowedSchedule(t *testing.T) { time.Sleep(w.Interval * 2) } } - // we should have either 16 or 17 minus 3 missed - So(len(r), ShouldBeBetweenOrEqual, 10, 12) + // there are 4 missed responses, so for this schedule + // we expect to get between 15 - 18 responses + So(len(r), ShouldBeBetweenOrEqual, 15, 18) var missed uint for _, x := range r { missed += x.Missed() } - So(missed, ShouldBeBetweenOrEqual, 22, 24) + So(missed, ShouldEqual, 4) + + // start_time points to the past, + // so the task is expected to fire immediately + So( + r[0].LastTime().Sub(before).Seconds(), + ShouldBeBetweenOrEqual, + 0, + (interval).Seconds(), + ) }) Convey("start without stop", func() { @@ -133,20 +194,21 @@ func TestWindowedSchedule(t *testing.T) { last = time.Now() r = append(r, r1) } + + // the task is expected to fire immediately on start_time So( r[0].LastTime().Sub(before).Seconds(), ShouldBeBetweenOrEqual, - (startWait+interval).Seconds()*.9, - (startWait+interval).Seconds()*1.5, + (startWait).Seconds(), + (startWait + interval).Seconds(), ) }) Convey("stop without start", func() { - startWait := time.Millisecond * 50 windowSize := time.Millisecond * 200 interval := time.Millisecond * 10 - stop := time.Now().Add(startWait + windowSize) + stop := time.Now().Add(windowSize) w := NewWindowedSchedule( interval, nil, @@ -165,12 +227,52 @@ func TestWindowedSchedule(t *testing.T) { last = time.Now() r = append(r, r1) } + + // the task should start immediately So( r[0].LastTime().Sub(before).Seconds(), ShouldBeBetweenOrEqual, - (interval).Seconds()*.9, - (interval).Seconds()*1.5, + 0, + (interval).Seconds(), ) + + var missed uint + for _, x := range r { + missed += x.Missed() + } + So(missed, ShouldEqual, 0) + }) + + Convey("start immediatelly without stop (no window determined)", func() { + interval := time.Millisecond * 10 + // schedule equivalent to simple schedule + w := NewWindowedSchedule( + interval, + nil, + nil, + ) + + err := w.Validate() + So(err, ShouldBeNil) + + var r []Response + last := *new(time.Time) + + before := time.Now() + for len(r) <= 10 { + r1 := w.Wait(last) + last = time.Now() + r = append(r, r1) + } + + // the task should start immediately + So( + r[0].LastTime().Sub(before).Seconds(), + ShouldBeBetweenOrEqual, + 0, + (interval).Seconds(), + ) + }) Convey("start time in past is ok (as long as window ends in the future)", func() { diff --git a/scheduler/task.go b/scheduler/task.go index ee95bf146..d5f4cdbb0 100644 --- a/scheduler/task.go +++ b/scheduler/task.go @@ -252,7 +252,8 @@ func (t *task) Spin() { // in time that a task starts spinning. E.g. stopping a task, // waiting a period of time, and starting the task won't show // misses for the interval while stopped. - t.lastFireTime = time.Now() + t.lastFireTime = time.Time{} + if t.state == core.TaskStopped { t.state = core.TaskSpinning t.killChan = make(chan struct{}) diff --git a/scheduler/watcher.go b/scheduler/watcher.go index 734061be7..55d5e2091 100644 --- a/scheduler/watcher.go +++ b/scheduler/watcher.go @@ -85,14 +85,14 @@ func (t *taskWatcherCollection) rm(taskID string, tw *TaskWatcher) { func (t *taskWatcherCollection) add(taskID string, twh core.TaskWatcherHandler) (*TaskWatcher, error) { t.mutex.Lock() defer t.mutex.Unlock() - // init map for task ID if it does not eist + // init map for task ID if it does not exist if t.coll[taskID] == nil { t.coll[taskID] = make([]*TaskWatcher, 0) } tw := &TaskWatcher{ // Assign unique ID to task watcher id: t.tIDCounter, - // Add ref to coll for cleanup later + // Add ref to call for cleanup later parent: t, stopped: false, handler: twh,