diff --git a/core/task.go b/core/task.go index c78d684db..1a018cb5a 100644 --- a/core/task.go +++ b/core/task.go @@ -22,6 +22,7 @@ package core import ( "encoding/json" "errors" + "fmt" "io" "io/ioutil" "time" @@ -157,6 +158,7 @@ type TaskErrors interface { type TaskCreationRequest struct { Name string `json:"name"` + Version int `json:"version"` Deadline string `json:"deadline"` Workflow *wmap.WorkflowMap `json:"workflow"` Schedule Schedule `json:"schedule"` @@ -164,6 +166,48 @@ type TaskCreationRequest struct { MaxFailures int `json:"max-failures"` } +func (tr *TaskCreationRequest) UnmarshalJSON(data []byte) error { + t := make(map[string]json.RawMessage) + if err := json.Unmarshal(data, &t); err != nil { + return err + } + for k, v := range t { + switch k { + case "name": + if err := json.Unmarshal(v, &(tr.Name)); err != nil { + return fmt.Errorf("%v (while parsing 'name')", err) + } + case "deadline": + if err := json.Unmarshal(v, &(tr.Deadline)); err != nil { + return fmt.Errorf("%v (while parsing 'deadline')", err) + } + case "workflow": + if err := json.Unmarshal(v, &(tr.Workflow)); err != nil { + return err + } + case "schedule": + if err := json.Unmarshal(v, &(tr.Schedule)); err != nil { + return err + } + case "start": + if err := json.Unmarshal(v, &(tr.Start)); err != nil { + return fmt.Errorf("%v (while parsing 'start')", err) + } + case "max-failures": + if err := json.Unmarshal(v, &(tr.MaxFailures)); err != nil { + return fmt.Errorf("%v (while parsing 'max-failures')", err) + } + case "version": + if err := json.Unmarshal(v, &(tr.Version)); err != nil { + return fmt.Errorf("%v (while parsing 'version')", err) + } + default: + return fmt.Errorf("Unrecognized key '%v' in task creation request", k) + } + } + return nil +} + // Function used to create a task according to content (1st parameter) // . Content can be retrieved from a configuration file or a HTTP REST request body // . Mode is used to specify if the created task should start right away or not diff --git a/scheduler/wmap/wmap.go b/scheduler/wmap/wmap.go index 0d150d1a4..cd10958fe 100644 --- a/scheduler/wmap/wmap.go +++ b/scheduler/wmap/wmap.go @@ -133,6 +133,24 @@ type WorkflowMap struct { CollectNode *CollectWorkflowMapNode `json:"collect"yaml:"collect"` } +func (w *WorkflowMap) UnmarshalJSON(data []byte) error { + t := make(map[string]json.RawMessage) + if err := json.Unmarshal(data, &t); err != nil { + return err + } + for k, v := range t { + switch k { + case "collect": + if err := json.Unmarshal(v, &w.CollectNode); err != nil { + return err + } + default: + return fmt.Errorf("Unrecognized key '%v' in workflow of task.", k) + } + } + return nil +} + func NewWorkflowMap() *WorkflowMap { w := &WorkflowMap{} c := &CollectWorkflowMapNode{ @@ -159,6 +177,40 @@ type CollectWorkflowMapNode struct { PublishNodes []PublishWorkflowMapNode `json:"publish,omitempty"yaml:"publish"` } +func (cw *CollectWorkflowMapNode) UnmarshalJSON(data []byte) error { + t := make(map[string]json.RawMessage) + if err := json.Unmarshal(data, &t); err != nil { + return err + } + for k, v := range t { + switch k { + case "metrics": + if err := json.Unmarshal(v, &cw.Metrics); err != nil { + return err + } + case "config": + if err := json.Unmarshal(v, &cw.Config); err != nil { + return fmt.Errorf("%v (while parsing 'config')", err) + } + case "tags": + if err := json.Unmarshal(v, &cw.Tags); err != nil { + return fmt.Errorf("%v (while parsing 'tags')", err) + } + case "process": + if err := json.Unmarshal(v, &cw.ProcessNodes); err != nil { + return err + } + case "publish": + if err := json.Unmarshal(v, &cw.PublishNodes); err != nil { + return err + } + default: + return fmt.Errorf("Unrecognized key '%v' in collect workflow of task.", k) + } + } + return nil +} + func (c *CollectWorkflowMapNode) GetMetrics() []Metric { metrics := make([]Metric, len(c.Metrics)) i := 0 @@ -240,6 +292,45 @@ type ProcessWorkflowMapNode struct { Target string `json:"target"yaml:"target"` } +func (pw *ProcessWorkflowMapNode) UnmarshalJSON(data []byte) error { + t := make(map[string]json.RawMessage) + if err := json.Unmarshal(data, &t); err != nil { + return err + } + for k, v := range t { + switch k { + case "plugin_name": + if err := json.Unmarshal(v, &pw.Name); err != nil { + return fmt.Errorf("%v (while parsing 'plugin_name')", err) + } + case "plugin_version": + if err := json.Unmarshal(v, &pw.Version); err != nil { + return fmt.Errorf("%v (while parsing 'plugin_version')", err) + } + case "process": + if err := json.Unmarshal(v, &pw.ProcessNodes); err != nil { + return err + } + case "publish": + if err := json.Unmarshal(v, &pw.PublishNodes); err != nil { + return err + } + case "config": + if err := json.Unmarshal(v, &pw.Config); err != nil { + return fmt.Errorf("%v (while parsing 'config')", err) + } + case "target": + if err := json.Unmarshal(v, &pw.Target); err != nil { + return fmt.Errorf("%v (while parsing 'target')", err) + } + default: + return fmt.Errorf("Unrecognized key '%v' in process workflow of task.", k) + } + } + return nil + +} + func NewProcessNode(name string, version int) *ProcessWorkflowMapNode { p := &ProcessWorkflowMapNode{ Name: name, @@ -282,6 +373,36 @@ type PublishWorkflowMapNode struct { Target string `json:"target"yaml:"target"` } +func (pw *PublishWorkflowMapNode) UnmarshalJSON(data []byte) error { + t := make(map[string]json.RawMessage) + if err := json.Unmarshal(data, &t); err != nil { + return err + } + for k, v := range t { + switch k { + case "plugin_name": + if err := json.Unmarshal(v, &pw.Name); err != nil { + return fmt.Errorf("%v (while parsing 'plugin_name')", err) + } + case "plugin_version": + if err := json.Unmarshal(v, &pw.Version); err != nil { + return fmt.Errorf("%v (while parsing 'plugin_version')", err) + } + case "config": + if err := json.Unmarshal(v, &pw.Config); err != nil { + return fmt.Errorf("%v (while parsing 'config')", err) + } + case "target": + if err := json.Unmarshal(v, &pw.Target); err != nil { + return fmt.Errorf("%v (while parsing 'target')", err) + } + default: + return fmt.Errorf("Unrecognized key '%v' in publish workflow of task.", k) + } + } + return nil +} + func NewPublishNode(name string, version int) *PublishWorkflowMapNode { p := &PublishWorkflowMapNode{ Name: name, @@ -308,6 +429,24 @@ type metricInfo struct { Version_ int `json:"version"yaml:"version"` } +func (m *metricInfo) UnmarshalJSON(data []byte) error { + t := make(map[string]json.RawMessage) + if err := json.Unmarshal(data, &t); err != nil { + return err + } + for k, v := range t { + switch k { + case "version": + if err := json.Unmarshal(v, &m.Version_); err != nil { + return fmt.Errorf("%v (while parsing 'version')", err) + } + default: + return fmt.Errorf("Unrecognized key '%v' in metrics in collect workflow of task", k) + } + } + return nil +} + type Metric struct { namespace []string version int