Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Fixing issue 392 Issuing start on a running task and stop a stopped task
Browse files Browse the repository at this point in the history
  • Loading branch information
tiffanyfay committed Oct 21, 2015
1 parent 2da7665 commit 680a577
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 4 deletions.
10 changes: 10 additions & 0 deletions cmd/pulsectl/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,11 @@ func startTask(ctx *cli.Context) {
id := ctx.Args().First()
r := pClient.StartTask(id)
if r.Err != nil {
if strings.Contains(r.Err.Error(), "Task is already running.") {
fmt.Println("Task is already running")
fmt.Printf("ID: %s\n", id)
os.Exit(0)
}
fmt.Printf("Error starting task:\n%v\n", r.Err)
os.Exit(1)
}
Expand All @@ -387,6 +392,11 @@ func stopTask(ctx *cli.Context) {
id := ctx.Args().First()
r := pClient.StopTask(id)
if r.Err != nil {
if strings.Contains(r.Err.Error(), "Task is already stopped.") {
fmt.Println("Task is already stopped")
fmt.Printf("ID: %s\n", id)
os.Exit(0)
}
fmt.Printf("Error stopping task:\n%v\n", r.Err)
os.Exit(1)
}
Expand Down
13 changes: 10 additions & 3 deletions mgmt/rest/client/client_func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,10 +450,13 @@ func TestPulseClient(t *testing.T) {
c.LoadPlugin(FILE_PLUGIN_PATH)

p1 := c.CreateTask(&Schedule{Type: "simple", Interval: "1s"}, getWMFromSample("1.json"), "baron", false)

p2 := c.StartTask(p1.ID)
So(p2.Err, ShouldBeNil)
So(p2.ID, ShouldEqual, p1.ID)

p3 := c.CreateTask(&Schedule{Type: "simple", Interval: "1s"}, getWMFromSample("1.json"), "baron", true)
p4 := c.StartTask(p3.ID)
So(p4.Err.Error(), ShouldEqual, "error 0: Task is already running. ")
})
Convey("do returns err!=nil", func() {
port := -1
Expand Down Expand Up @@ -486,8 +489,12 @@ func TestPulseClient(t *testing.T) {

p1 := c.CreateTask(&Schedule{Type: "simple", Interval: "1s"}, getWMFromSample("1.json"), "baron", false)
p2 := c.StopTask(p1.ID)
So(p2.Err, ShouldBeNil)
So(p2.ID, ShouldEqual, p1.ID)
So(p2.Err.Error(), ShouldEqual, "error 0: Task is already stopped. ")

p3 := c.CreateTask(&Schedule{Type: "simple", Interval: "1s"}, getWMFromSample("1.json"), "baron", true)
p4 := c.StopTask(p3.ID)
So(p3.Err, ShouldBeNil)
So(p4.ID, ShouldEqual, p3.ID)

b := make([]byte, 5)
rsp, err := c.do("PUT", fmt.Sprintf("/tasks/%v/stop", p1.ID), ContentTypeJSON, b)
Expand Down
24 changes: 24 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ var (

ErrMetricManagerNotSet = errors.New("MetricManager is not set.")
ErrSchedulerNotStarted = errors.New("Scheduler is not started.")
ErrTaskAlreadyRunning = errors.New("Task is already running.")
ErrTaskAlreadyStopped = errors.New("Task is already stopped.")
)

type schedulerState int
Expand Down Expand Up @@ -261,6 +263,17 @@ func (s *scheduler) StartTask(id string) []perror.PulseError {
}
}

if t.state == core.TaskFiring || t.state == core.TaskSpinning {
s.logger.WithFields(log.Fields{
"_block": "start-task",
"task-id": t.ID(),
"task-state": t.State(),
}).Info("task is already running")
return []perror.PulseError{
perror.New(ErrTaskAlreadyRunning),
}
}

mts, plugins := s.gatherMetricsAndPlugins(t.workflow)
cps := returnCorePlugin(plugins)
errs := s.metricManager.SubscribeDeps(t.ID(), mts, cps)
Expand Down Expand Up @@ -294,6 +307,17 @@ func (s *scheduler) StopTask(id string) []perror.PulseError {
}
}

if t.state == core.TaskStopped {
s.logger.WithFields(log.Fields{
"_block": "stop-task",
"task-id": t.ID(),
"task-state": t.State(),
}).Info("task is already stopped")
return []perror.PulseError{
perror.New(ErrTaskAlreadyStopped),
}
}

mts, plugins := s.gatherMetricsAndPlugins(t.workflow)
cps := returnCorePlugin(plugins)
errs := s.metricManager.UnsubscribeDeps(t.ID(), mts, cps)
Expand Down
6 changes: 5 additions & 1 deletion scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ func TestScheduler(t *testing.T) {

// // TODO NICK
Convey("create a task", func() {

tsk, err := s.CreateTask(schedule.NewSimpleSchedule(time.Second*5), w, false)
So(len(err.Errors()), ShouldEqual, 0)
So(tsk, ShouldNotBeNil)
Expand All @@ -279,6 +278,11 @@ func TestScheduler(t *testing.T) {
So(err, ShouldNotBeNil)
So(t, ShouldBeNil)
})
Convey("stop a stopped task", func() {
err := s.StopTask(tsk.ID())
So(len(err), ShouldEqual, 1)
So(err[0].Error(), ShouldEqual, "Task is already stopped.")
})
})

// // // TODO NICK
Expand Down

0 comments on commit 680a577

Please sign in to comment.