Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Fixes #1044: Returns an error when trying to stop a disabled task.
Browse files Browse the repository at this point in the history
  • Loading branch information
geauxvirtual committed Jul 14, 2016
1 parent 2b8d574 commit 7a3573b
Show file tree
Hide file tree
Showing 2 changed files with 318 additions and 30 deletions.
71 changes: 41 additions & 30 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ var (
ErrTaskAlreadyStopped = errors.New("Task is already stopped.")
// ErrTaskDisabledNotRunnable - The error message for task is disabled and cannot be started
ErrTaskDisabledNotRunnable = errors.New("Task is disabled. Cannot be started.")
// ErrTaskDisabledNotStoppable - The error message for when a task is disabled and cannot be stopped
ErrTaskDisabledNotStoppable = errors.New("Task is disabled. Only running tasks can be stopped.")
)

type schedulerState int
Expand Down Expand Up @@ -553,47 +555,56 @@ func (s *scheduler) stopTask(id, source string) []serror.SnapError {
}
}

if t.state == core.TaskStopped {
switch t.state {
case core.TaskStopped:
logger.WithFields(log.Fields{
"task-id": t.ID(),
"task-state": t.State(),
}).Info("task is already stopped")
}).Error("task is already stopped")
return []serror.SnapError{
serror.New(ErrTaskAlreadyStopped),
}
}

// Group depndencies by the host they live on and
// unsubscirbe them since task is stopping.
depGroupMap := s.gatherMetricsAndPlugins(t.workflow)
case core.TaskDisabled:
logger.WithFields(log.Fields{
"task-id": t.ID(),
"task-state": t.State(),
}).Error("invalid action (stop) called on disabled task")
return []serror.SnapError{
serror.New(ErrTaskDisabledNotStoppable),
}
default:
// Group depndencies by the host they live on and
// unsubscirbe them since task is stopping.
depGroupMap := s.gatherMetricsAndPlugins(t.workflow)

var errs []serror.SnapError
for k := range depGroupMap {
mgr, err := t.RemoteManagers.Get(k)
if err != nil {
errs = append(errs, serror.New(err))
} else {
uerrs := mgr.UnsubscribeDeps(t.ID(), depGroupMap[k].Metrics, returnCorePlugin(depGroupMap[k].Plugins))
if len(uerrs) > 0 {
errs = append(errs, uerrs...)
var errs []serror.SnapError
for k := range depGroupMap {
mgr, err := t.RemoteManagers.Get(k)
if err != nil {
errs = append(errs, serror.New(err))
} else {
uerrs := mgr.UnsubscribeDeps(t.ID(), depGroupMap[k].Metrics, returnCorePlugin(depGroupMap[k].Plugins))
if len(uerrs) > 0 {
errs = append(errs, uerrs...)
}
}
}
}
if len(errs) > 0 {
return errs
}
if len(errs) > 0 {
return errs
}

event := &scheduler_event.TaskStoppedEvent{
TaskID: t.ID(),
Source: source,
event := &scheduler_event.TaskStoppedEvent{
TaskID: t.ID(),
Source: source,
}
defer s.eventManager.Emit(event)
t.Stop()
logger.WithFields(log.Fields{
"task-id": t.ID(),
"task-state": t.State(),
}).Info("task stopped")
return nil
}
defer s.eventManager.Emit(event)
t.Stop()
logger.WithFields(log.Fields{
"task-id": t.ID(),
"task-state": t.State(),
}).Info("task stopped")
return nil
}

//EnableTask changes state from disabled to stopped
Expand Down
277 changes: 277 additions & 0 deletions scheduler/scheduler_medium_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
// +build medium

/*
http://www.apache.org/licenses/LICENSE-2.0.txt
Copyright 2015-2016 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package scheduler

import (
"errors"
"fmt"
"testing"
"time"

"github.com/Sirupsen/logrus"
. "github.com/smartystreets/goconvey/convey"

"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/core/cdata"
"github.com/intelsdi-x/snap/core/ctypes"
"github.com/intelsdi-x/snap/core/serror"
"github.com/intelsdi-x/snap/pkg/schedule"
"github.com/intelsdi-x/snap/scheduler/wmap"
)

type mockMetricManager struct {
failValidatingMetrics bool
failValidatingMetricsAfter int
failuredSoFar int
acceptedContentTypes map[string][]string
returnedContentTypes map[string][]string
autodiscoverPaths []string
}

func (m *mockMetricManager) lazyContentType(key string) {
if m.acceptedContentTypes == nil {
m.acceptedContentTypes = make(map[string][]string)
}
if m.returnedContentTypes == nil {
m.returnedContentTypes = make(map[string][]string)
}
if m.acceptedContentTypes[key] == nil {
m.acceptedContentTypes[key] = []string{"snap.gob"}
}
if m.returnedContentTypes[key] == nil {
m.returnedContentTypes[key] = []string{}
}
}

// Used to mock type from plugin
func (m *mockMetricManager) setAcceptedContentType(n string, t core.PluginType, v int, s []string) {
key := fmt.Sprintf("%s:%d:%d", n, t, v)
m.lazyContentType(key)
m.acceptedContentTypes[key] = s
}

func (m *mockMetricManager) setReturnedContentType(n string, t core.PluginType, v int, s []string) {
key := fmt.Sprintf("%s:%d:%d", n, t, v)
m.lazyContentType(key)
m.returnedContentTypes[key] = s
}

func (m *mockMetricManager) GetPluginContentTypes(n string, t core.PluginType, v int) ([]string, []string, error) {
key := fmt.Sprintf("%s:%d:%d", n, t, v)
m.lazyContentType(key)

return m.acceptedContentTypes[key], m.returnedContentTypes[key], nil
}

func (m *mockMetricManager) CollectMetrics([]core.Metric, time.Time, string, map[string]map[string]string) ([]core.Metric, []error) {
return nil, nil
}

func (m *mockMetricManager) PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error {
return nil
}

func (m *mockMetricManager) ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) (string, []byte, []error) {
return "", nil, nil
}

func (m *mockMetricManager) ValidateDeps(mts []core.Metric, prs []core.SubscribedPlugin) []serror.SnapError {
if m.failValidatingMetrics {
return []serror.SnapError{
serror.New(errors.New("metric validation error")),
}
}
return nil
}
func (m *mockMetricManager) SubscribeDeps(taskID string, mts []core.Metric, prs []core.Plugin) []serror.SnapError {
return []serror.SnapError{
serror.New(errors.New("metric validation error")),
}
}

func (m *mockMetricManager) UnsubscribeDeps(taskID string, mts []core.Metric, prs []core.Plugin) []serror.SnapError {
return nil
}

func (m *mockMetricManager) MatchQueryToNamespaces(core.Namespace) ([]core.Namespace, serror.SnapError) {
return nil, nil
}

func (m *mockMetricManager) ExpandWildcards(core.Namespace) ([]core.Namespace, serror.SnapError) {
return nil, nil
}

func (m *mockMetricManager) SetAutodiscoverPaths(paths []string) {
m.autodiscoverPaths = paths
}

func (m *mockMetricManager) GetAutodiscoverPaths() []string {
return m.autodiscoverPaths
}

type mockMetricManagerError struct {
errs []error
}

type mockMetricType struct {
version int
namespace []string
lastAdvertisedTime time.Time
config *cdata.ConfigDataNode
}

func (m mockMetricType) Version() int {
return m.version
}

func (m mockMetricType) Namespace() []string {
return m.namespace
}

func (m mockMetricType) LastAdvertisedTime() time.Time {
return m.lastAdvertisedTime
}

func (m mockMetricType) Config() *cdata.ConfigDataNode {
return m.config
}

func (m mockMetricType) Data() interface{} {
return nil
}

type mockScheduleResponse struct {
}

func (m mockScheduleResponse) state() schedule.ScheduleState {
return schedule.Active
}

func (m mockScheduleResponse) err() error {
return nil
}

func (m mockScheduleResponse) missedIntervals() uint {
return 0
}

// Helper constructor functions for resuse amongst tests
func newMockMetricManager() *mockMetricManager {
m := new(mockMetricManager)
m.setAcceptedContentType("machine", core.ProcessorPluginType, 1, []string{"snap.*", "snap.gob", "foo.bar"})
m.setReturnedContentType("machine", core.ProcessorPluginType, 1, []string{"snap.gob"})
m.setAcceptedContentType("rmq", core.PublisherPluginType, -1, []string{"snap.json", "snap.gob"})
m.setAcceptedContentType("file", core.PublisherPluginType, -1, []string{"snap.json"})
return m
}

func newScheduler() *scheduler {
cfg := GetDefaultConfig()
s := New(cfg)
s.SetMetricManager(newMockMetricManager())
return s
}

func newMockWorkflowMap() *wmap.WorkflowMap {
w := wmap.NewWorkflowMap()
// Collection node
w.CollectNode.AddMetric("/foo/bar", 1)
w.CollectNode.AddMetric("/foo/baz", 2)
w.CollectNode.AddConfigItem("/foo/bar", "username", "root")
w.CollectNode.AddConfigItem("/foo/bar", "port", 8080)
w.CollectNode.AddConfigItem("/foo/bar", "ratio", 0.32)
w.CollectNode.AddConfigItem("/foo/bar", "yesorno", true)

// Add a process node
pr1 := wmap.NewProcessNode("machine", 1)
pr1.AddConfigItem("username", "wat")
pr1.AddConfigItem("howmuch", 9999)

// Add a process node
pr12 := wmap.NewProcessNode("machine", 1)
pr12.AddConfigItem("username", "wat2")
pr12.AddConfigItem("howmuch", 99992)

// Publish node for our process node
pu1 := wmap.NewPublishNode("rmq", -1)
pu1.AddConfigItem("birthplace", "dallas")
pu1.AddConfigItem("monies", 2)

// Publish node direct to collection
pu2 := wmap.NewPublishNode("file", -1)
pu2.AddConfigItem("color", "brown")
pu2.AddConfigItem("purpose", 42)

pr12.Add(pu2)
pr1.Add(pr12)
w.CollectNode.Add(pr1)
w.CollectNode.Add(pu1)
return w
}

// ----------------------------- Medium Tests ----------------------------
func TestStopTask(t *testing.T) {
logrus.SetLevel(logrus.FatalLevel)
s := newScheduler()
s.Start()
w := newMockWorkflowMap()
tsk, _ := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*100), w, false)
task := s.tasks.Get(tsk.ID())
task.Spin()
err := s.StopTask(tsk.ID())

Convey("Calling StopTask a running task", t, func() {
Convey("Should not return an error", func() {
So(err, ShouldBeNil)
})
time.Sleep(100 * time.Millisecond)
Convey("State of the task should be TaskStopped", func() {
So(task.state, ShouldEqual, core.TaskStopped)
})
})

tskStopped, _ := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*100), w, false)
err = s.StopTask(tskStopped.ID())
Convey("Calling StopTask on a stopped task", t, func() {
Convey("Should return an error", func() {
So(err, ShouldNotBeNil)
})
Convey("Error should read: Task is already stopped.", func() {
So(err[0].Error(), ShouldResemble, "Task is already stopped.")
})
})

tskDisabled, _ := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*100), w, false)
taskDisabled := s.tasks.Get(tskDisabled.ID())
taskDisabled.state = core.TaskDisabled
err = s.StopTask(tskDisabled.ID())
Convey("Calling StopTask on a disabled task", t, func() {
Convey("Should return an error", func() {
So(err, ShouldNotBeNil)
})
Convey("Error should read: Task is disabled. Only running tasks can be stopped.", func() {
So(err[0].Error(), ShouldResemble, "Task is disabled. Only running tasks can be stopped.")
})
})

s.Stop()
}

0 comments on commit 7a3573b

Please sign in to comment.