Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Removes task pkg
Browse files Browse the repository at this point in the history
Updating protobuf revisionId
  • Loading branch information
jcooklin authored and obourdon committed Jun 20, 2016
1 parent 0c1c232 commit dfc1e3c
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 226 deletions.
2 changes: 1 addition & 1 deletion Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

90 changes: 90 additions & 0 deletions core/schedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
http://www.apache.org/licenses/LICENSE-2.0.txt
Copyright 2015 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package core

import (
"errors"
"time"

"github.com/intelsdi-x/snap/pkg/schedule"
)

type Schedule struct {
Type string `json:"type,omitempty"`
Interval string `json:"interval,omitempty"`
StartTimestamp *int64 `json:"start_timestamp,omitempty"`
StopTimestamp *int64 `json:"stop_timestamp,omitempty"`
}

func makeSchedule(s Schedule) (schedule.Schedule, error) {
switch s.Type {
case "simple":
d, err := time.ParseDuration(s.Interval)
if err != nil {
return nil, err
}
sch := schedule.NewSimpleSchedule(d)

err = sch.Validate()
if err != nil {
return nil, err
}
return sch, nil
case "windowed":
d, err := time.ParseDuration(s.Interval)
if err != nil {
return nil, err
}

var start, stop *time.Time
if s.StartTimestamp != nil {
t := time.Unix(*s.StartTimestamp, 0)
start = &t
}
if s.StopTimestamp != nil {
t := time.Unix(*s.StopTimestamp, 0)
stop = &t
}
sch := schedule.NewWindowedSchedule(
d,
start,
stop,
)

err = sch.Validate()
if err != nil {
return nil, err
}
return sch, nil
case "cron":
if s.Interval == "" {
return nil, errors.New("missing cron entry")
}
sch := schedule.NewCronSchedule(s.Interval)

err := sch.Validate()
if err != nil {
return nil, err
}
return sch, nil
default:
return nil, errors.New("unknown schedule type " + s.Type)
}
}
85 changes: 85 additions & 0 deletions core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ limitations under the License.
package core

import (
"encoding/json"
"errors"
"io"
"io/ioutil"
"time"

log "github.com/Sirupsen/logrus"
Expand Down Expand Up @@ -150,3 +154,84 @@ func SetTaskID(id string) TaskOption {
type TaskErrors interface {
Errors() []serror.SnapError
}

type TaskCreationRequest struct {
Name string `json:"name"`
Deadline string `json:"deadline"`
Workflow *wmap.WorkflowMap `json:"workflow"`
Schedule Schedule `json:"schedule"`
Start bool `json:"start"`
}

// Function used to create a task according to content (1st parameter)
// . Content can be retrieved from a configuration file or a HTTP REST request body
// . Mode is used to specify if the created task should start right away or not
// . function pointer is responsible for effectively creating and returning the created task
func CreateTaskFromContent(body io.ReadCloser,
mode *bool,
fp func(sch schedule.Schedule,
wfMap *wmap.WorkflowMap,
startOnCreate bool,
opts ...TaskOption) (Task, TaskErrors)) (Task, error) {

tr, err := marshalTask(body)
if err != nil {
return nil, err
}

sch, err := makeSchedule(tr.Schedule)
if err != nil {
return nil, err
}

var opts []TaskOption
if tr.Deadline != "" {
dl, err := time.ParseDuration(tr.Deadline)
if err != nil {
return nil, err
}
opts = append(opts, TaskDeadlineDuration(dl))
}

if tr.Name != "" {
opts = append(opts, SetTaskName(tr.Name))
}
opts = append(opts, OptionStopOnFailure(10))

if mode == nil {
mode = &tr.Start
}
if fp == nil {
return nil, errors.New("Missing workflow creation routine")
}
task, errs := fp(sch, tr.Workflow, *mode, opts...)
if errs != nil && len(errs.Errors()) != 0 {
var errMsg string
for _, e := range errs.Errors() {
errMsg = errMsg + e.Error() + " -- "
}
return nil, errors.New(errMsg[:len(errMsg)-4])
}
return task, nil
}

func marshalTask(body io.ReadCloser) (*TaskCreationRequest, error) {
var tr TaskCreationRequest
errCode, err := MarshalBody(&tr, body)
if errCode != 0 && err != nil {
return nil, err
}
return &tr, nil
}

func MarshalBody(in interface{}, body io.ReadCloser) (int, error) {
b, err := ioutil.ReadAll(body)
if err != nil {
return 500, err
}
err = json.Unmarshal(b, in)
if err != nil {
return 400, err
}
return 0, nil
}
11 changes: 5 additions & 6 deletions pkg/task/task_test.go → core/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package task
package core

import (
"errors"
Expand All @@ -28,7 +28,6 @@ import (
"testing"
"time"

"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/core/serror"
"github.com/intelsdi-x/snap/pkg/schedule"
"github.com/intelsdi-x/snap/scheduler/wmap"
Expand All @@ -50,15 +49,15 @@ func (t *taskErrors) Errors() []serror.SnapError {

const (
DUMMY_FILE = "dummy.txt"
YAML_FILE = "../../examples/tasks/mock-file.yaml"
JSON_FILE = "../../examples/tasks/mock-file.json"
YAML_FILE = "../examples/tasks/mock-file.yaml"
JSON_FILE = "../examples/tasks/mock-file.json"
DUMMY_TYPE = "dummy"
)

func koRoutine(sch schedule.Schedule,
wfMap *wmap.WorkflowMap,
startOnCreate bool,
opts ...core.TaskOption) (core.Task, core.TaskErrors) {
opts ...TaskOption) (Task, TaskErrors) {
// Create a container for task errors
te := &taskErrors{
errs: make([]serror.SnapError, 0),
Expand All @@ -70,7 +69,7 @@ func koRoutine(sch schedule.Schedule,
func okRoutine(sch schedule.Schedule,
wfMap *wmap.WorkflowMap,
startOnCreate bool,
opts ...core.TaskOption) (core.Task, core.TaskErrors) {
opts ...TaskOption) (Task, TaskErrors) {
return nil, nil
}

Expand Down
6 changes: 3 additions & 3 deletions mgmt/rest/client/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"strings"
"time"

"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/mgmt/rest/rbody"
"github.com/intelsdi-x/snap/pkg/task"
"github.com/intelsdi-x/snap/scheduler/wmap"
)

Expand All @@ -49,8 +49,8 @@ type Schedule struct {
// Otherwise, it's in the Stopped state. CreateTask is accomplished through a POST HTTP JSON request.
// A ScheduledTask is returned if it succeeds, otherwise an error is returned.
func (c *Client) CreateTask(s *Schedule, wf *wmap.WorkflowMap, name string, deadline string, startTask bool) *CreateTaskResult {
t := task.TaskCreationRequest{
Schedule: task.Schedule{
t := core.TaskCreationRequest{
Schedule: core.Schedule{
Type: s.Type,
Interval: s.Interval,
},
Expand Down
5 changes: 2 additions & 3 deletions mgmt/rest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/core/cdata"
"github.com/intelsdi-x/snap/mgmt/rest/rbody"
"github.com/intelsdi-x/snap/pkg/task"
"github.com/julienschmidt/httprouter"
)

Expand Down Expand Up @@ -88,7 +87,7 @@ func (s *Server) deletePluginConfigItem(w http.ResponseWriter, r *http.Request,
}

src := []string{}
errCode, err := task.MarshalBody(&src, r.Body)
errCode, err := core.MarshalBody(&src, r.Body)
if errCode != 0 && err != nil {
respond(400, rbody.FromError(err), w)
return
Expand Down Expand Up @@ -130,7 +129,7 @@ func (s *Server) setPluginConfigItem(w http.ResponseWriter, r *http.Request, p h
}

src := cdata.NewNode()
errCode, err := task.MarshalBody(src, r.Body)
errCode, err := core.MarshalBody(src, r.Body)
if errCode != 0 && err != nil {
respond(400, rbody.FromError(err), w)
return
Expand Down
9 changes: 4 additions & 5 deletions mgmt/rest/rbody/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/pkg/schedule"
"github.com/intelsdi-x/snap/pkg/task"
"github.com/intelsdi-x/snap/scheduler/wmap"
)

Expand Down Expand Up @@ -121,7 +120,7 @@ type ScheduledTask struct {
Name string `json:"name"`
Deadline string `json:"deadline"`
Workflow *wmap.WorkflowMap `json:"workflow,omitempty"`
Schedule *task.Schedule `json:"schedule,omitempty"`
Schedule *core.Schedule `json:"schedule,omitempty"`
CreationTimestamp int64 `json:"creation_timestamp,omitempty"`
LastRunTimestamp int64 `json:"last_run_timestamp,omitempty"`
HitCount int `json:"hit_count,omitempty"`
Expand Down Expand Up @@ -217,23 +216,23 @@ func (s *ScheduledTaskEnabled) ResponseBodyType() string {
func assertSchedule(s schedule.Schedule, t *AddScheduledTask) {
switch v := s.(type) {
case *schedule.SimpleSchedule:
t.Schedule = &task.Schedule{
t.Schedule = &core.Schedule{
Type: "simple",
Interval: v.Interval.String(),
}
return
case *schedule.WindowedSchedule:
startTime := v.StartTime.Unix()
stopTime := v.StopTime.Unix()
t.Schedule = &task.Schedule{
t.Schedule = &core.Schedule{
Type: "windowed",
Interval: v.Interval.String(),
StartTimestamp: &startTime,
StopTimestamp: &stopTime,
}
return
case *schedule.CronSchedule:
t.Schedule = &task.Schedule{
t.Schedule = &core.Schedule{
Type: "cron",
Interval: v.Entry(),
}
Expand Down
7 changes: 3 additions & 4 deletions mgmt/rest/rest_func_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// +build legacy
/// +build legacy

/*
http://www.apache.org/licenses/LICENSE-2.0.txt
Expand Down Expand Up @@ -46,7 +46,6 @@ import (
"github.com/intelsdi-x/snap/core/ctypes"
"github.com/intelsdi-x/snap/mgmt/rest/rbody"
"github.com/intelsdi-x/snap/pkg/cfgfile"
"github.com/intelsdi-x/snap/pkg/task"
"github.com/intelsdi-x/snap/scheduler"
"github.com/intelsdi-x/snap/scheduler/wmap"
. "github.com/smartystreets/goconvey/convey"
Expand Down Expand Up @@ -252,8 +251,8 @@ func createTask(sample, name, interval string, noStart bool, port int) *rbody.AP

uri := fmt.Sprintf("http://localhost:%d/v1/tasks", port)

t := task.TaskCreationRequest{
Schedule: task.Schedule{Type: "simple", Interval: interval},
t := core.TaskCreationRequest{
Schedule: core.Schedule{Type: "simple", Interval: interval},
Workflow: wf,
Name: name,
Start: !noStart,
Expand Down
3 changes: 1 addition & 2 deletions mgmt/rest/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/mgmt/rest/rbody"
mtask "github.com/intelsdi-x/snap/pkg/task"
)

var (
Expand All @@ -45,7 +44,7 @@ var (
)

func (s *Server) addTask(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
task, err := mtask.CreateTaskFromContent(r.Body, nil, s.mt.CreateTask)
task, err := core.CreateTaskFromContent(r.Body, nil, s.mt.CreateTask)
if err != nil {
respond(500, rbody.FromError(err), w)
return
Expand Down
3 changes: 1 addition & 2 deletions mgmt/tribe/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/intelsdi-x/snap/core/serror"
"github.com/intelsdi-x/snap/mgmt/rest/client"
"github.com/intelsdi-x/snap/pkg/schedule"
"github.com/intelsdi-x/snap/pkg/task"
"github.com/intelsdi-x/snap/scheduler"
"github.com/intelsdi-x/snap/scheduler/wmap"
)
Expand Down Expand Up @@ -514,7 +513,7 @@ func (w worker) isPluginLoaded(n, t string, v int) bool {
return false
}

func getSchedule(s *task.Schedule) schedule.Schedule {
func getSchedule(s *core.Schedule) schedule.Schedule {
switch s.Type {
case "simple":
d, e := time.ParseDuration(s.Interval)
Expand Down
Loading

0 comments on commit dfc1e3c

Please sign in to comment.