diff --git a/scheduler/distributed_task_test.go b/scheduler/distributed_task_test.go new file mode 100644 index 000000000..4f62e4aa9 --- /dev/null +++ b/scheduler/distributed_task_test.go @@ -0,0 +1,296 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2016 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "errors" + "fmt" + "net" + "path" + "testing" + "time" + + "github.com/intelsdi-x/snap/control" + "github.com/intelsdi-x/snap/core" + "github.com/intelsdi-x/snap/core/serror" + "github.com/intelsdi-x/snap/pkg/schedule" + "github.com/intelsdi-x/snap/scheduler/wmap" + . "github.com/smartystreets/goconvey/convey" +) + +var ( + PluginPath = path.Join(SnapPath, "plugin") +) + +func TestDistributedWorkflow(t *testing.T) { + Convey("Create a scheduler with 2 controls and load plugins", t, func() { + l, _ := net.Listen("tcp", ":0") + l.Close() + cfg := control.GetDefaultConfig() + cfg.ListenPort = l.Addr().(*net.TCPAddr).Port + c1 := control.New(cfg) + c1.Start() + m, _ := net.Listen("tcp", ":0") + m.Close() + cfg.ListenPort = m.Addr().(*net.TCPAddr).Port + port1 := cfg.ListenPort + c2 := control.New(cfg) + schcfg := GetDefaultConfig() + sch := New(schcfg) + c2.Start() + sch.SetMetricManager(c1) + err := sch.Start() + So(err, ShouldBeNil) + // Load appropriate plugins into each control. + mock2Path := path.Join(PluginPath, "snap-collector-mock2") + passthruPath := path.Join(PluginPath, "snap-processor-passthru") + filePath := path.Join(PluginPath, "snap-publisher-file") + + // mock2 and file onto c1 + + rp, err := core.NewRequestedPlugin(mock2Path) + So(err, ShouldBeNil) + _, err = c1.Load(rp) + So(err, ShouldBeNil) + rp, err = core.NewRequestedPlugin(filePath) + So(err, ShouldBeNil) + _, err = c1.Load(rp) + So(err, ShouldBeNil) + // passthru on c2 + rp, err = core.NewRequestedPlugin(passthruPath) + So(err, ShouldBeNil) + passthru, err := c2.Load(rp) + So(err, ShouldBeNil) + + Convey("Test task with one local and one remote node", func() { + //Create a task + //Create a workflowmap + wf := dsWFMap(port1) + t, errs := sch.CreateTask(schedule.NewSimpleSchedule(time.Second), wf, true) + So(len(errs.Errors()), ShouldEqual, 0) + So(t, ShouldNotBeNil) + }) + + Convey("Test task with invalid remote port", func() { + wf := dsWFMap(0) + t, errs := sch.CreateTask(schedule.NewSimpleSchedule(time.Second), wf, true) + So(len(errs.Errors()), ShouldEqual, 1) + So(t, ShouldBeNil) + }) + + Convey("Test task without remote plugin", func() { + _, err := c2.Unload(passthru) + So(err, ShouldBeNil) + wf := dsWFMap(port1) + t, errs := sch.CreateTask(schedule.NewSimpleSchedule(time.Second), wf, true) + So(len(errs.Errors()), ShouldEqual, 1) + So(t, ShouldBeNil) + }) + + Convey("Test task failing when control is stopped while task is running", func() { + wf := dsWFMap(port1) + interval := time.Millisecond * 100 + t, errs := sch.CreateTask(schedule.NewSimpleSchedule(interval), wf, true) + So(len(errs.Errors()), ShouldEqual, 0) + So(t, ShouldNotBeNil) + c2.Stop() + // Give task time to fail + time.Sleep(time.Second) + tasks := sch.GetTasks() + var task core.Task + for _, v := range tasks { + task = v + } + So(task.State(), ShouldEqual, core.TaskDisabled) + }) + + }) + +} + +func TestDistributedSubscriptions(t *testing.T) { + + Convey("Load control/scheduler with a mock remote scheduler", t, func() { + l, _ := net.Listen("tcp", ":0") + l.Close() + cfg := control.GetDefaultConfig() + cfg.ListenPort = l.Addr().(*net.TCPAddr).Port + c1 := control.New(cfg) + c1.Start() + m, _ := net.Listen("tcp", ":0") + m.Close() + cfg.ListenPort = m.Addr().(*net.TCPAddr).Port + port1 := cfg.ListenPort + c2 := control.New(cfg) + schcfg := GetDefaultConfig() + sch := New(schcfg) + c2.Start() + sch.SetMetricManager(c1) + err := sch.Start() + So(err, ShouldBeNil) + // Load appropriate plugins into each control. + mock2Path := path.Join(PluginPath, "snap-collector-mock2") + passthruPath := path.Join(PluginPath, "snap-processor-passthru") + filePath := path.Join(PluginPath, "snap-publisher-file") + + // mock2 and file onto c1 + + rp, err := core.NewRequestedPlugin(mock2Path) + So(err, ShouldBeNil) + _, err = c1.Load(rp) + So(err, ShouldBeNil) + rp, err = core.NewRequestedPlugin(filePath) + So(err, ShouldBeNil) + _, err = c1.Load(rp) + So(err, ShouldBeNil) + // passthru on c2 + rp, err = core.NewRequestedPlugin(passthruPath) + So(err, ShouldBeNil) + _, err = c2.Load(rp) + So(err, ShouldBeNil) + + Convey("Starting task should not succeed if remote dep fails to subscribe", func() { + //Create a task + //Create a workflowmap + wf := dsWFMap(port1) + // Create a task that is not started immediately so we can + // validate deps correctly. + t, errs := sch.CreateTask(schedule.NewSimpleSchedule(time.Second), wf, false) + So(len(errs.Errors()), ShouldEqual, 0) + So(t, ShouldNotBeNil) + schTask := t.(*task) + remoteMockManager := &subscriptionManager{Fail: true} + schTask.RemoteManagers.Add(fmt.Sprintf("127.0.0.1:%v", port1), remoteMockManager) + localMockManager := &subscriptionManager{Fail: false} + schTask.RemoteManagers.Add("", localMockManager) + // Start task. We expect it to fail while subscribing deps + terrs := sch.StartTask(t.ID()) + So(terrs, ShouldNotBeNil) + Convey("So dependencies should have been unsubscribed", func() { + // Ensure that unsubscribe call count is equal to subscribe call count + // i.e that every subscribe call was followed by an unsubscribe since + // we errored + So(remoteMockManager.UnsubscribeCallCount, ShouldEqual, remoteMockManager.SubscribeCallCount) + So(localMockManager.UnsubscribeCallCount, ShouldEqual, localMockManager.UnsubscribeCallCount) + }) + }) + + Convey("Starting task should not succeed if missing local dep fails to subscribe", func() { + //Create a task + //Create a workflowmap + wf := dsWFMap(port1) + // Create a task that is not started immediately so we can + // validate deps correctly. + t, errs := sch.CreateTask(schedule.NewSimpleSchedule(time.Second), wf, false) + So(len(errs.Errors()), ShouldEqual, 0) + So(t, ShouldNotBeNil) + schTask := t.(*task) + localMockManager := &subscriptionManager{Fail: true} + schTask.RemoteManagers.Add("", localMockManager) + remoteMockManager := &subscriptionManager{Fail: false} + schTask.RemoteManagers.Add(fmt.Sprintf("127.0.0.1:%v", port1), remoteMockManager) + + // Start task. We expect it to fail while subscribing deps + terrs := sch.StartTask(t.ID()) + So(terrs, ShouldNotBeNil) + Convey("So dependencies should have been unsubscribed", func() { + // Ensure that unsubscribe call count is equal to subscribe call count + // i.e that every subscribe call was followed by an unsubscribe since + // we errored + So(remoteMockManager.UnsubscribeCallCount, ShouldEqual, remoteMockManager.SubscribeCallCount) + So(localMockManager.UnsubscribeCallCount, ShouldEqual, localMockManager.UnsubscribeCallCount) + }) + }) + + Convey("Starting task should suceed if all deps are available", func() { + //Create a task + //Create a workflowmap + wf := dsWFMap(port1) + // Create a task that is not started immediately so we can + // validate deps correctly. + t, errs := sch.CreateTask(schedule.NewSimpleSchedule(time.Second), wf, false) + So(len(errs.Errors()), ShouldEqual, 0) + So(t, ShouldNotBeNil) + schTask := t.(*task) + localMockManager := &subscriptionManager{Fail: false} + schTask.RemoteManagers.Add("", localMockManager) + remoteMockManager := &subscriptionManager{Fail: false} + schTask.RemoteManagers.Add(fmt.Sprintf("127.0.0.1:%v", port1), remoteMockManager) + terrs := sch.StartTask(t.ID()) + So(terrs, ShouldBeNil) + Convey("So all depndencies should have been subscribed to", func() { + // Ensure that unsubscribe call count is equal to subscribe call count + // i.e that every subscribe call was followed by an unsubscribe since + // we errored + So(localMockManager.SubscribeCallCount, ShouldBeGreaterThan, 0) + So(remoteMockManager.SubscribeCallCount, ShouldBeGreaterThan, 0) + }) + }) + }) +} + +func dsWFMap(port int) *wmap.WorkflowMap { + wf := new(wmap.WorkflowMap) + + c := wmap.NewCollectWorkflowMapNode() + c.Config["/intel/mock/foo"] = make(map[string]interface{}) + c.Config["/intel/mock/foo"]["password"] = "required" + pr := &wmap.ProcessWorkflowMapNode{ + Name: "passthru", + Version: -1, + Config: make(map[string]interface{}), + Target: fmt.Sprintf("127.0.0.1:%v", port), + } + pu := &wmap.PublishWorkflowMapNode{ + Name: "file", + Version: -1, + Config: make(map[string]interface{}), + } + pu.Config["file"] = "/dev/null" + pr.Add(pu) + c.Add(pr) + e := c.AddMetric("/intel/mock/foo", 2) + if e != nil { + panic(e) + } + wf.CollectNode = c + + return wf +} + +type subscriptionManager struct { + mockMetricManager + Fail bool + SubscribeCallCount int + UnsubscribeCallCount int +} + +func (m *subscriptionManager) SubscribeDeps(taskID string, mts []core.Metric, prs []core.Plugin) []serror.SnapError { + if m.Fail { + return []serror.SnapError{serror.New(errors.New("error"))} + } + m.SubscribeCallCount += 1 + return nil +} + +func (m *subscriptionManager) UnsubscribeDeps(taskID string, mts []core.Metric, prs []core.Plugin) []serror.SnapError { + m.UnsubscribeCallCount += 1 + return nil +} diff --git a/scheduler/managers.go b/scheduler/managers.go new file mode 100644 index 000000000..3ce503c92 --- /dev/null +++ b/scheduler/managers.go @@ -0,0 +1,68 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2016 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "errors" + "fmt" + "sync" +) + +type managers struct { + mutex *sync.RWMutex + local managesMetrics + remoteManagers map[string]managesMetrics +} + +func newManagers(mm managesMetrics) managers { + return managers{ + mutex: &sync.RWMutex{}, + remoteManagers: make(map[string]managesMetrics), + local: mm, + } +} + +// Adds the key:value to the remoteManagers map to make them accessible +// via Get() calls. +func (m *managers) Add(key string, val managesMetrics) { + m.mutex.Lock() + if key == "" { + m.local = val + } else { + m.remoteManagers[key] = val + } + m.mutex.Unlock() +} + +// Returns the managesMetric instance that maps to given +// string. If an empty string is given, will instead return +// the local instance passed in on initializiation. +func (m *managers) Get(key string) (managesMetrics, error) { + if key == "" { + return m.local, nil + } + m.mutex.RLock() + defer m.mutex.RUnlock() + if val, ok := m.remoteManagers[key]; ok { + return val, nil + } else { + return nil, errors.New(fmt.Sprintf("Client not found for: %v", key)) + } +} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index e16d96a9a..aebad465f 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -202,17 +202,33 @@ func (s *scheduler) createTask(sch schedule.Schedule, wfMap *wmap.WorkflowMap, s return nil, te } - // validate plugins and metrics - - mts, plugins := s.gatherMetricsAndPlugins(wf) - errs := s.metricManager.ValidateDeps(mts, plugins) - if len(errs) > 0 { - te.errs = append(te.errs, errs...) + // Create the task object + task, err := newTask(sch, wf, s.workManager, s.metricManager, s.eventManager, opts...) + if err != nil { + te.errs = append(te.errs, serror.New(err)) + f := buildErrorsLog(te.Errors(), logger) + f.Error("Unable to create task") return nil, te } + // Group dependencies by the node they live on + // and validate them. + depGroupMap := s.gatherMetricsAndPlugins(wf) + for k, val := range depGroupMap { + manager, err := task.RemoteManagers.Get(k) + if err != nil { + te.errs = append(te.errs, serror.New(err)) + return nil, te + } + errs := manager.ValidateDeps(val.Metrics, val.Plugins) + if len(errs) > 0 { + te.errs = append(te.errs, errs...) + return nil, te + } + } + // Bind plugin content type selections in workflow - err = wf.BindPluginContentTypes(s.metricManager) + err = wf.BindPluginContentTypes(&task.RemoteManagers) if err != nil { te.errs = append(te.errs, serror.New(err)) f := buildErrorsLog(te.Errors(), logger) @@ -220,9 +236,6 @@ func (s *scheduler) createTask(sch schedule.Schedule, wfMap *wmap.WorkflowMap, s return nil, te } - // Create the task object - task := newTask(sch, wf, s.workManager, s.metricManager, s.eventManager, opts...) - // Add task to taskCollection if err := s.tasks.add(task); err != nil { te.errs = append(te.errs, serror.New(err)) @@ -325,6 +338,7 @@ func (s *scheduler) startTask(id, source string) []serror.SnapError { "_block": "start-task", "source": source, }) + t, err := s.getTask(id) if err != nil { schedulerLogger.WithFields(log.Fields{ @@ -354,19 +368,37 @@ func (s *scheduler) startTask(id, source string) []serror.SnapError { serror.New(ErrTaskAlreadyRunning), } } - - mts, plugins := s.gatherMetricsAndPlugins(t.workflow) - cps := returnCorePlugin(plugins) - serrs := s.metricManager.SubscribeDeps(t.ID(), mts, cps) - if len(serrs) > 0 { - // Tear down plugin processes started so far. - uerrs := s.metricManager.UnsubscribeDeps(t.ID(), mts, cps) - errs := append(serrs, uerrs...) - logger.WithFields(log.Fields{ - "task-id": t.ID(), - "_error": errs, - }).Error("task failed to start due to dependencies") - return errs + // Group dependencies by the node they live on + // and subscribe to them. + depGroupMap := s.gatherMetricsAndPlugins(t.workflow) + var subbedDeps []string + for k := range depGroupMap { + var errs []serror.SnapError + cps := returnCorePlugin(depGroupMap[k].Plugins) + mgr, err := t.RemoteManagers.Get(k) + if err != nil { + errs = append(errs, serror.New(err)) + } else { + errs = mgr.SubscribeDeps(t.ID(), depGroupMap[k].Metrics, cps) + } + // If there are errors with subscribing any deps, go through and unsubscribe all other + // deps that may have already been subscribed then return the errors. + if len(errs) > 0 { + for _, key := range subbedDeps { + cps := returnCorePlugin(depGroupMap[key].Plugins) + mts := depGroupMap[key].Metrics + mgr, err := t.RemoteManagers.Get(key) + if err != nil { + errs = append(errs, serror.New(err)) + } else { + uerrs := mgr.UnsubscribeDeps(t.ID(), mts, cps) + errs = append(errs, uerrs...) + } + } + return errs + } + // If subscribed successfully add to subbedDeps + subbedDeps = append(subbedDeps, k) } event := &scheduler_event.TaskStartedEvent{ @@ -417,9 +449,22 @@ func (s *scheduler) stopTask(id, source string) []serror.SnapError { } } - mts, plugins := s.gatherMetricsAndPlugins(t.workflow) - cps := returnCorePlugin(plugins) - errs := s.metricManager.UnsubscribeDeps(t.ID(), mts, cps) + // Group depndencies by the host they live on and + // unsubscirbe them since task is stopping. + depGroupMap := s.gatherMetricsAndPlugins(t.workflow) + + var errs []serror.SnapError + for k := range depGroupMap { + mgr, err := t.RemoteManagers.Get(k) + if err != nil { + errs = append(errs, serror.New(err)) + } else { + uerrs := mgr.UnsubscribeDeps(t.ID(), depGroupMap[k].Metrics, returnCorePlugin(depGroupMap[k].Plugins)) + if len(uerrs) > 0 { + errs = append(errs, uerrs...) + } + } + } if len(errs) > 0 { return errs } @@ -479,6 +524,7 @@ func (s *scheduler) Start() error { schedulerLogger.WithFields(log.Fields{ "_block": "start-scheduler", }).Info("scheduler started") + return nil } @@ -563,9 +609,14 @@ func (s *scheduler) HandleGomitEvent(e gomit.Event) { }).Debug("event received") // We need to unsubscribe from deps when a task goes disabled task, _ := s.getTask(v.TaskID) - mts, plugins := s.gatherMetricsAndPlugins(task.workflow) - cps := returnCorePlugin(plugins) - s.metricManager.UnsubscribeDeps(task.ID(), mts, cps) + depGroupMap := s.gatherMetricsAndPlugins(task.workflow) + for k := range depGroupMap { + cps := returnCorePlugin(depGroupMap[k].Plugins) + mgr, err := task.RemoteManagers.Get(k) + if err == nil { + mgr.UnsubscribeDeps(task.ID(), depGroupMap[k].Metrics, cps) + } + } s.taskWatcherColl.handleTaskDisabled(v.TaskID, v.Why) default: log.WithFields(log.Fields{ @@ -584,12 +635,14 @@ func (s *scheduler) getTask(id string) (*task, error) { return task, nil } -func (s *scheduler) gatherMetricsAndPlugins(wf *schedulerWorkflow) ([]core.Metric, []core.SubscribedPlugin) { - var ( - mts []core.Metric - plugins []core.SubscribedPlugin - ) +type depGroup struct { + Metrics []core.Metric + Plugins []core.SubscribedPlugin +} +func (s *scheduler) gatherMetricsAndPlugins(wf *schedulerWorkflow) map[string]depGroup { + var mts []core.Metric + depGroupMap := make(map[string]depGroup) for _, m := range wf.metrics { nss, err := s.metricManager.MatchQueryToNamespaces(m.Namespace()) if err != nil { @@ -605,18 +658,33 @@ func (s *scheduler) gatherMetricsAndPlugins(wf *schedulerWorkflow) ([]core.Metri }) } } - s.walkWorkflow(wf.processNodes, wf.publishNodes, &plugins) + // Add metrics to depGroup map under local host(signified by empty string) + // for now since remote collection not supported + depGroupMap[""] = depGroup{Metrics: mts} + s.walkWorkflow(wf.processNodes, wf.publishNodes, depGroupMap) - return mts, plugins + return depGroupMap } -func (s *scheduler) walkWorkflow(prnodes []*processNode, pbnodes []*publishNode, plugins *[]core.SubscribedPlugin) { +func (s *scheduler) walkWorkflow(prnodes []*processNode, pbnodes []*publishNode, depGroupMap map[string]depGroup) { for _, pr := range prnodes { - *plugins = append(*plugins, pr) - s.walkWorkflow(pr.ProcessNodes, pr.PublishNodes, plugins) + if _, ok := depGroupMap[pr.Target]; ok { + dg := depGroupMap[pr.Target] + dg.Plugins = append(dg.Plugins, pr) + depGroupMap[pr.Target] = dg + } else { + depGroupMap[pr.Target] = depGroup{Plugins: []core.SubscribedPlugin{pr}} + } + s.walkWorkflow(pr.ProcessNodes, pr.PublishNodes, depGroupMap) } for _, pb := range pbnodes { - *plugins = append(*plugins, pb) + if _, ok := depGroupMap[pb.Target]; ok { + dg := depGroupMap[pb.Target] + dg.Plugins = append(dg.Plugins, pb) + depGroupMap[pb.Target] = dg + } else { + depGroupMap[pb.Target] = depGroup{Plugins: []core.SubscribedPlugin{pb}} + } } } diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index 78fec0da8..b916d0d6d 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -54,7 +54,7 @@ func (m *mockMetricManager) lazyContentType(key string) { m.returnedContentTypes = make(map[string][]string) } if m.acceptedContentTypes[key] == nil { - m.acceptedContentTypes[key] = []string{} + m.acceptedContentTypes[key] = []string{"snap.gob"} } if m.returnedContentTypes[key] == nil { m.returnedContentTypes[key] = []string{} @@ -173,7 +173,8 @@ func TestScheduler(t *testing.T) { c.setReturnedContentType("machine", core.ProcessorPluginType, 1, []string{"snap.gob"}) c.setAcceptedContentType("rmq", core.PublisherPluginType, -1, []string{"snap.json", "snap.gob"}) c.setAcceptedContentType("file", core.PublisherPluginType, -1, []string{"snap.json"}) - s := New(GetDefaultConfig()) + cfg := GetDefaultConfig() + s := New(cfg) s.SetMetricManager(c) w := wmap.NewWorkflowMap() // Collection node diff --git a/scheduler/task.go b/scheduler/task.go index 703ddbedf..1083f3544 100644 --- a/scheduler/task.go +++ b/scheduler/task.go @@ -22,6 +22,8 @@ package scheduler import ( "errors" "fmt" + "net" + "strconv" "sync" "time" @@ -31,6 +33,7 @@ import ( "github.com/intelsdi-x/snap/core" "github.com/intelsdi-x/snap/core/scheduler_event" + "github.com/intelsdi-x/snap/grpc/controlproxy" "github.com/intelsdi-x/snap/pkg/schedule" "github.com/intelsdi-x/snap/scheduler/wmap" ) @@ -80,10 +83,11 @@ type task struct { lastFailureTime time.Time stopOnFailure uint eventEmitter gomit.Emitter + RemoteManagers managers } //NewTask creates a Task -func newTask(s schedule.Schedule, wf *schedulerWorkflow, m *workManager, mm managesMetrics, emitter gomit.Emitter, opts ...core.TaskOption) *task { +func newTask(s schedule.Schedule, wf *schedulerWorkflow, m *workManager, mm managesMetrics, emitter gomit.Emitter, opts ...core.TaskOption) (*task, error) { //Task would always be given a default name. //However if a user want to change this name, she can pass optional arguments, in form of core.TaskOption @@ -92,6 +96,11 @@ func newTask(s schedule.Schedule, wf *schedulerWorkflow, m *workManager, mm mana taskID := uuid.New() name := fmt.Sprintf("Task-%s", taskID) wf.eventEmitter = emitter + mgrs := newManagers(mm) + err := createTaskClients(&mgrs, wf) + if err != nil { + return nil, err + } task := &task{ id: taskID, name: name, @@ -105,12 +114,13 @@ func newTask(s schedule.Schedule, wf *schedulerWorkflow, m *workManager, mm mana deadlineDuration: DefaultDeadlineDuration, stopOnFailure: DefaultStopOnFailure, eventEmitter: emitter, + RemoteManagers: mgrs, } //set options for _, opt := range opts { opt(task) } - return task + return task, nil } // Option sets the options specified. @@ -445,3 +455,52 @@ func (t *taskCollection) Table() map[string]*task { } return tasks } + +// createTaskClients walks the workflowmap and creates clients for this task +// remoteManagers so that nodes that require proxy request can make them. +func createTaskClients(mgrs *managers, wf *schedulerWorkflow) error { + return walkWorkflow(wf.processNodes, wf.publishNodes, mgrs) +} + +func walkWorkflow(prnodes []*processNode, pbnodes []*publishNode, mgrs *managers) error { + for _, pr := range prnodes { + if pr.Target != "" { + host, port, err := net.SplitHostPort(pr.Target) + if err != nil { + return err + } + p, err := strconv.Atoi(port) + if err != nil { + return err + } + proxy, err := controlproxy.New(host, p) + if err != nil { + return err + } + mgrs.Add(pr.Target, proxy) + } + err := walkWorkflow(pr.ProcessNodes, pr.PublishNodes, mgrs) + if err != nil { + return err + } + + } + for _, pu := range pbnodes { + if pu.Target != "" { + host, port, err := net.SplitHostPort(pu.Target) + if err != nil { + return err + } + p, err := strconv.Atoi(port) + if err != nil { + return err + } + proxy, err := controlproxy.New(host, p) + if err != nil { + return err + } + mgrs.Add(pu.Target, proxy) + } + } + return nil +} diff --git a/scheduler/task_test.go b/scheduler/task_test.go index f2ca2c606..a22688e97 100644 --- a/scheduler/task_test.go +++ b/scheduler/task_test.go @@ -47,11 +47,13 @@ func TestTask(t *testing.T) { So(errs, ShouldBeEmpty) c := &mockMetricManager{} c.setAcceptedContentType("rabbitmq", core.PublisherPluginType, 5, []string{plugin.SnapGOBContentType}) - err := wf.BindPluginContentTypes(c) + mgrs := newManagers(c) + err := wf.BindPluginContentTypes(&mgrs) So(err, ShouldBeNil) Convey("task + simple schedule", func() { sch := schedule.NewSimpleSchedule(time.Millisecond * 100) - task := newTask(sch, wf, newWorkManager(), c, emitter) + task, err := newTask(sch, wf, newWorkManager(), c, emitter) + So(err, ShouldBeNil) task.Spin() time.Sleep(time.Millisecond * 10) // it is a race so we slow down the test So(task.state, ShouldEqual, core.TaskSpinning) @@ -61,14 +63,16 @@ func TestTask(t *testing.T) { Convey("Task specified-name test", func() { sch := schedule.NewSimpleSchedule(time.Millisecond * 100) - task := newTask(sch, wf, newWorkManager(), c, emitter, core.SetTaskName("My name is unique")) + task, err := newTask(sch, wf, newWorkManager(), c, emitter, core.SetTaskName("My name is unique")) + So(err, ShouldBeNil) task.Spin() So(task.GetName(), ShouldResemble, "My name is unique") }) Convey("Task default-name test", func() { sch := schedule.NewSimpleSchedule(time.Millisecond * 100) - task := newTask(sch, wf, newWorkManager(), c, emitter) + task, err := newTask(sch, wf, newWorkManager(), c, emitter) + So(err, ShouldBeNil) task.Spin() So(task.GetName(), ShouldResemble, "Task-"+task.ID()) @@ -76,7 +80,8 @@ func TestTask(t *testing.T) { Convey("Task deadline duration test", func() { sch := schedule.NewSimpleSchedule(time.Millisecond * 100) - task := newTask(sch, wf, newWorkManager(), c, emitter, core.TaskDeadlineDuration(20*time.Second)) + task, err := newTask(sch, wf, newWorkManager(), c, emitter, core.TaskDeadlineDuration(20*time.Second)) + So(err, ShouldBeNil) task.Spin() So(task.deadlineDuration, ShouldEqual, 20*time.Second) task.Option(core.TaskDeadlineDuration(20 * time.Second)) @@ -87,8 +92,10 @@ func TestTask(t *testing.T) { Convey("Tasks are created and creation of task table is checked", func() { sch := schedule.NewSimpleSchedule(time.Millisecond * 100) - task := newTask(sch, wf, newWorkManager(), c, emitter) - task1 := newTask(sch, wf, newWorkManager(), c, emitter) + task, err := newTask(sch, wf, newWorkManager(), c, emitter) + So(err, ShouldBeNil) + task1, err := newTask(sch, wf, newWorkManager(), c, emitter) + So(err, ShouldBeNil) task1.Spin() task.Spin() tC := newTaskCollection() @@ -102,7 +109,8 @@ func TestTask(t *testing.T) { Convey("Task is created and starts to spin", func() { sch := schedule.NewSimpleSchedule(time.Second * 5) - task := newTask(sch, wf, newWorkManager(), c, emitter) + task, err := newTask(sch, wf, newWorkManager(), c, emitter) + So(err, ShouldBeNil) task.Spin() So(task.state, ShouldEqual, core.TaskSpinning) Convey("Task is Stopped", func() { @@ -114,7 +122,8 @@ func TestTask(t *testing.T) { Convey("task fires", func() { sch := schedule.NewSimpleSchedule(time.Nanosecond * 100) - task := newTask(sch, wf, newWorkManager(), c, emitter) + task, err := newTask(sch, wf, newWorkManager(), c, emitter) + So(err, ShouldBeNil) task.Spin() time.Sleep(time.Millisecond * 50) So(task.hitCount, ShouldBeGreaterThan, 2) @@ -124,19 +133,21 @@ func TestTask(t *testing.T) { Convey("Enable a running task", func() { sch := schedule.NewSimpleSchedule(time.Millisecond * 10) - task := newTask(sch, wf, newWorkManager(), c, emitter) + task, err := newTask(sch, wf, newWorkManager(), c, emitter) + So(err, ShouldBeNil) task.Spin() - err := task.Enable() + err = task.Enable() So(err, ShouldNotBeNil) So(task.State(), ShouldEqual, core.TaskSpinning) }) Convey("Enable a disabled task", func() { sch := schedule.NewSimpleSchedule(time.Millisecond * 10) - task := newTask(sch, wf, newWorkManager(), c, emitter) + task, err := newTask(sch, wf, newWorkManager(), c, emitter) + So(err, ShouldBeNil) task.state = core.TaskDisabled - err := task.Enable() + err = task.Enable() So(err, ShouldBeNil) So(task.State(), ShouldEqual, core.TaskStopped) }) @@ -148,7 +159,8 @@ func TestTask(t *testing.T) { So(errs, ShouldBeEmpty) sch := schedule.NewSimpleSchedule(time.Millisecond * 10) - task := newTask(sch, wf, newWorkManager(), &mockMetricManager{}, emitter) + task, err := newTask(sch, wf, newWorkManager(), &mockMetricManager{}, emitter) + So(err, ShouldBeNil) So(task.id, ShouldNotBeEmpty) So(task.id, ShouldNotBeNil) taskCollection := newTaskCollection() @@ -182,7 +194,8 @@ func TestTask(t *testing.T) { }) Convey("Create another task and compare the id", func() { - task2 := newTask(sch, wf, newWorkManager(), &mockMetricManager{}, emitter) + task2, err := newTask(sch, wf, newWorkManager(), &mockMetricManager{}, emitter) + So(err, ShouldBeNil) So(task2.id, ShouldNotEqual, task.ID()) }) diff --git a/scheduler/wmap/string.go b/scheduler/wmap/string.go index 014e7ee5b..d85f55e29 100644 --- a/scheduler/wmap/string.go +++ b/scheduler/wmap/string.go @@ -82,6 +82,8 @@ func (p *ProcessWorkflowMapNode) String(pad string) string { for k, v := range p.Config { out += pad + " " + fmt.Sprintf("%s=%+v\n", k, v) } + out += pad + " Target:" + p.Target + "\n" + out += pad + " Process Nodes:\n" for _, pr := range p.ProcessNodes { out += pr.String(pad + " ") diff --git a/scheduler/wmap/wmap.go b/scheduler/wmap/wmap.go index f1748d5c7..5d95358b9 100644 --- a/scheduler/wmap/wmap.go +++ b/scheduler/wmap/wmap.go @@ -176,6 +176,13 @@ func (c *CollectWorkflowMapNode) GetTags() map[string]map[string]string { return c.Tags } +func NewCollectWorkflowMapNode() *CollectWorkflowMapNode { + return &CollectWorkflowMapNode{ + Metrics: make(map[string]metricInfo), + Config: make(map[string]map[string]interface{}), + } +} + // GetConfigTree converts config data for collection node in wmap into a proper cdata.ConfigDataTree func (c *CollectWorkflowMapNode) GetConfigTree() (*cdata.ConfigDataTree, error) { cdt := cdata.NewTree() @@ -229,6 +236,7 @@ type ProcessWorkflowMapNode struct { PublishNodes []PublishWorkflowMapNode `json:"publish,omitempty"yaml:"publish"` // TODO processor config Config map[string]interface{} `json:"config,omitempty"yaml:"config"` + Target string `json:"target"yaml:"target"` } func NewProcessNode(name string, version int) *ProcessWorkflowMapNode { @@ -270,6 +278,7 @@ type PublishWorkflowMapNode struct { Version int `json:"plugin_version"yaml:"plugin_version"` // TODO publisher config Config map[string]interface{} `json:"config,omitempty"yaml:"config"` + Target string `json:"target"yaml:"target"` } func NewPublishNode(name string, version int) *PublishWorkflowMapNode { diff --git a/scheduler/workflow.go b/scheduler/workflow.go index fa7b184c3..45b44580a 100644 --- a/scheduler/workflow.go +++ b/scheduler/workflow.go @@ -139,6 +139,7 @@ func convertProcessNode(pr []wmap.ProcessWorkflowMapNode) ([]*processNode, error name: p.Name, version: p.Version, config: cdn, + Target: p.Target, ProcessNodes: prC, PublishNodes: puC, } @@ -163,6 +164,7 @@ func convertPublishNode(pu []wmap.PublishWorkflowMapNode) ([]*publishNode, error name: p.Name, version: p.Version, config: cdn, + Target: p.Target, } } return puNodes, nil @@ -186,6 +188,7 @@ type processNode struct { name string version int config *cdata.ConfigDataNode + Target string ProcessNodes []*processNode PublishNodes []*publishNode InboundContentType string @@ -211,6 +214,7 @@ type publishNode struct { name string version int config *cdata.ConfigDataNode + Target string InboundContentType string } @@ -233,12 +237,16 @@ func (p *publishNode) TypeName() string { type wfContentTypes map[string]map[string][]string // BindPluginContentTypes -func (s *schedulerWorkflow) BindPluginContentTypes(mm managesPluginContentTypes) error { - return bindPluginContentTypes(s.publishNodes, s.processNodes, mm, []string{plugin.SnapGOBContentType}) +func (s *schedulerWorkflow) BindPluginContentTypes(mgrs *managers) error { + return bindPluginContentTypes(s.publishNodes, s.processNodes, []string{plugin.SnapGOBContentType}, mgrs) } -func bindPluginContentTypes(pus []*publishNode, prs []*processNode, mm managesPluginContentTypes, lct []string) error { +func bindPluginContentTypes(pus []*publishNode, prs []*processNode, lct []string, mgrs *managers) error { for _, pr := range prs { + mm, err := mgrs.Get(pr.Target) + if err != nil { + return err + } act, rct, err := mm.GetPluginContentTypes(pr.Name(), core.ProcessorPluginType, pr.Version()) if err != nil { return err @@ -273,11 +281,15 @@ func bindPluginContentTypes(pus []*publishNode, prs []*processNode, mm managesPl } } //continue the walk down the nodes - if err := bindPluginContentTypes(pr.PublishNodes, pr.ProcessNodes, mm, rct); err != nil { + if err := bindPluginContentTypes(pr.PublishNodes, pr.ProcessNodes, rct, mgrs); err != nil { return err } } for _, pu := range pus { + mm, err := mgrs.Get(pu.Target) + if err != nil { + return err + } act, _, err := mm.GetPluginContentTypes(pu.Name(), core.PublisherPluginType, pu.Version()) if err != nil { return err @@ -392,7 +404,20 @@ func submitProcessJob(pj job, t *task, wg *sync.WaitGroup, pr *processNode) { // Decrement the waitgroup defer wg.Done() // Create a new process job - j := newProcessJob(pj, pr.Name(), pr.Version(), pr.InboundContentType, pr.config.Table(), t.metricsManager, t.id) + mgr, err := t.RemoteManagers.Get(pr.Target) + if err != nil { + t.RecordFailure([]error{err}) + workflowLogger.WithFields(log.Fields{ + "_block": "submit-prblish-job", + "task-id": t.id, + "task-name": t.name, + "prblish-name": pr.Name(), + "prblish-version": pr.Version(), + "parent-node-type": pj.TypeString(), + }).Warn("Error getting control instance") + return + } + j := newProcessJob(pj, pr.Name(), pr.Version(), pr.InboundContentType, pr.config.Table(), mgr, t.id) workflowLogger.WithFields(log.Fields{ "_block": "submit-process-job", "task-id": t.id, @@ -434,7 +459,20 @@ func submitPublishJob(pj job, t *task, wg *sync.WaitGroup, pu *publishNode) { // Decrement the waitgroup defer wg.Done() // Create a new process job - j := newPublishJob(pj, pu.Name(), pu.Version(), pu.InboundContentType, pu.config.Table(), t.metricsManager, t.id) + mgr, err := t.RemoteManagers.Get(pu.Target) + if err != nil { + t.RecordFailure([]error{err}) + workflowLogger.WithFields(log.Fields{ + "_block": "submit-publish-job", + "task-id": t.id, + "task-name": t.name, + "publish-name": pu.Name(), + "publish-version": pu.Version(), + "parent-node-type": pj.TypeString(), + }).Warn("Error getting control instance") + return + } + j := newPublishJob(pj, pu.Name(), pu.Version(), pu.InboundContentType, pu.config.Table(), mgr, t.id) workflowLogger.WithFields(log.Fields{ "_block": "submit-publish-job", "task-id": t.id, diff --git a/scheduler/workflow_test.go b/scheduler/workflow_test.go index 45fb8189c..1334328b2 100644 --- a/scheduler/workflow_test.go +++ b/scheduler/workflow_test.go @@ -126,15 +126,16 @@ func (m MockMetricType) Data() interface{} { func TestCollectPublishWorkflow(t *testing.T) { log.SetLevel(log.FatalLevel) Convey("Given a started plugin control", t, func() { - cfg := control.GetDefaultConfig() - cfg.Plugins.Collector.Plugins = control.NewPluginsConfig() - cfg.Plugins.Collector.Plugins["mock"] = control.NewPluginConfigItem() - cfg.Plugins.Collector.Plugins["mock"].Versions = map[int]*cdata.ConfigDataNode{} - cfg.Plugins.Collector.Plugins["mock"].Versions[1] = cdata.NewNode() - cfg.Plugins.Collector.Plugins["mock"].Versions[1].AddItem("test", ctypes.ConfigValueBool{Value: true}) - c := control.New(cfg) + ccfg := control.GetDefaultConfig() + ccfg.Plugins.Collector.Plugins = control.NewPluginsConfig() + ccfg.Plugins.Collector.Plugins["mock"] = control.NewPluginConfigItem() + ccfg.Plugins.Collector.Plugins["mock"].Versions = map[int]*cdata.ConfigDataNode{} + ccfg.Plugins.Collector.Plugins["mock"].Versions[1] = cdata.NewNode() + ccfg.Plugins.Collector.Plugins["mock"].Versions[1].AddItem("test", ctypes.ConfigValueBool{Value: true}) + c := control.New(ccfg) c.Start() - s := New(GetDefaultConfig()) + cfg := GetDefaultConfig() + s := New(cfg) s.SetMetricManager(c) Convey("create a workflow", func() { rp, err := core.NewRequestedPlugin(snap_collector_mock2_path) @@ -195,10 +196,10 @@ func TestCollectPublishWorkflow(t *testing.T) { func TestProcessChainingWorkflow(t *testing.T) { log.SetLevel(log.FatalLevel) Convey("Given a started plugin control", t, func() { - c := control.New(control.GetDefaultConfig()) c.Start() - s := New(GetDefaultConfig()) + cfg := GetDefaultConfig() + s := New(cfg) s.SetMetricManager(c) Convey("create a workflow with chained processors", func() { lpe := newEventListener()