Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
address golint warning for package plugin and schedule
Browse files Browse the repository at this point in the history
  • Loading branch information
candysmurf committed Dec 7, 2015
1 parent ebb87bf commit ca1614e
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 52 deletions.
4 changes: 2 additions & 2 deletions plugin/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// Test helper for testing plugins
// Package helper The test helper for testing plugins
package helper

import (
Expand All @@ -32,7 +32,7 @@ var (
buildScript = "/scripts/build.sh"
)

// Attempts to make the plugins before each test.
// BuildPlugin attempts to make the plugins before each test.
func BuildPlugin(pluginType, pluginName string) error {
wd, err := os.Getwd()
if err != nil {
Expand Down
14 changes: 10 additions & 4 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,18 @@ import (
)

var (
// HandlerRegistrationName registers a handler with the event manager
HandlerRegistrationName = "scheduler"

ErrMetricManagerNotSet = errors.New("MetricManager is not set.")
ErrSchedulerNotStarted = errors.New("Scheduler is not started.")
ErrTaskAlreadyRunning = errors.New("Task is already running.")
ErrTaskAlreadyStopped = errors.New("Task is already stopped.")
// ErrMetricManagerNotSet - The error message for metricManager is not set
ErrMetricManagerNotSet = errors.New("MetricManager is not set.")
// ErrSchedulerNotStarted - The error message for scheduler is not started
ErrSchedulerNotStarted = errors.New("Scheduler is not started.")
// ErrTaskAlreadyRunning - The error message for task is already running
ErrTaskAlreadyRunning = errors.New("Task is already running.")
// ErrTaskAlreadyStopped - The error message for task is already stopped
ErrTaskAlreadyStopped = errors.New("Task is already stopped.")
// ErrTaskDisabledNotRunnable - The error message for task is disabled and cannot be started
ErrTaskDisabledNotRunnable = errors.New("Task is disabled. Cannot be started.")
)

Expand Down
4 changes: 2 additions & 2 deletions scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ func (m *mockMetricManager) ValidateDeps(mts []core.Metric, prs []core.Subscribe
}
return nil
}
func (m *mockMetricManager) SubscribeDeps(taskId string, mts []core.Metric, prs []core.Plugin) []serror.SnapError {
func (m *mockMetricManager) SubscribeDeps(taskID string, mts []core.Metric, prs []core.Plugin) []serror.SnapError {
return []serror.SnapError{
serror.New(errors.New("metric validation error")),
}
}

func (m *mockMetricManager) UnsubscribeDeps(taskId string, mts []core.Metric, prs []core.Plugin) []serror.SnapError {
func (m *mockMetricManager) UnsubscribeDeps(taskID string, mts []core.Metric, prs []core.Plugin) []serror.SnapError {
return nil
}

Expand Down
17 changes: 12 additions & 5 deletions scheduler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,25 @@ import (
)

const (
// DefaultDeadlineDuration - The default timeout is 5 second
DefaultDeadlineDuration = time.Second * 5
DefaultStopOnFailure = 3
// DefaultStopOnFailure - The default stopping a failure is after three tries
DefaultStopOnFailure = 3
)

var (
schedulerLogger = log.WithField("_module", "scheduler-task")

ErrTaskNotFound = errors.New("Task not found")
ErrTaskNotStopped = errors.New("Task must be stopped")
// ErrTaskNotFound - The error message for task not found
ErrTaskNotFound = errors.New("Task not found")
// ErrTaskNotStopped - The error message for task must be stopped
ErrTaskNotStopped = errors.New("Task must be stopped")
// ErrTaskHasAlreadyBeenAdded - The error message for task has already been added
ErrTaskHasAlreadyBeenAdded = errors.New("Task has already been added")
ErrTaskDisabledOnFailures = errors.New("Task disabled due to consecutive failures")
ErrTaskNotDisabled = errors.New("Task must be disabled")
// ErrTaskDisabledOnFailures - The error message for task disabled due to consecutive failures
ErrTaskDisabledOnFailures = errors.New("Task disabled due to consecutive failures")
// ErrTaskNotDisabled - The error message for task must be disabled
ErrTaskNotDisabled = errors.New("Task must be disabled")
)

type task struct {
Expand Down
77 changes: 39 additions & 38 deletions scheduler/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,18 @@ var (
watcherLog = log.WithField("_module", "scheduler-watcher")
)

// TaskWatcher struct type
type TaskWatcher struct {
id uint64
taskIds []string
taskIDs []string
parent *taskWatcherCollection
stopped bool
handler core.TaskWatcherHandler
}

// Stops watching a task. Cannot be restarted.
// Close stops watching a task. Cannot be restarted.
func (t *TaskWatcher) Close() error {
for _, x := range t.taskIds {
for _, x := range t.taskIDs {
t.parent.rm(x, t)
}
return nil
Expand All @@ -50,150 +51,150 @@ func (t *TaskWatcher) Close() error {
type taskWatcherCollection struct {
// Collection of task watchers by
coll map[string][]*TaskWatcher
tIdCounter uint64
tIDCounter uint64
mutex *sync.Mutex
}

func newTaskWatcherCollection() *taskWatcherCollection {
return &taskWatcherCollection{
coll: make(map[string][]*TaskWatcher),
tIdCounter: 1,
tIDCounter: 1,
mutex: &sync.Mutex{},
}
}

func (t *taskWatcherCollection) rm(taskId string, tw *TaskWatcher) {
func (t *taskWatcherCollection) rm(taskID string, tw *TaskWatcher) {
t.mutex.Lock()
defer t.mutex.Unlock()
if t.coll[taskId] != nil {
for i, w := range t.coll[taskId] {
if t.coll[taskID] != nil {
for i, w := range t.coll[taskID] {
if w == tw {
watcherLog.WithFields(log.Fields{
"task-id": taskId,
"task-id": taskID,
"task-watcher-id": tw.id,
}).Debug("removing watch from task")
t.coll[taskId] = append(t.coll[taskId][:i], t.coll[taskId][i+1:]...)
if len(t.coll[taskId]) == 0 {
delete(t.coll, taskId)
t.coll[taskID] = append(t.coll[taskID][:i], t.coll[taskID][i+1:]...)
if len(t.coll[taskID]) == 0 {
delete(t.coll, taskID)
}
}
}
}
}

func (t *taskWatcherCollection) add(taskId string, twh core.TaskWatcherHandler) (*TaskWatcher, error) {
func (t *taskWatcherCollection) add(taskID string, twh core.TaskWatcherHandler) (*TaskWatcher, error) {
t.mutex.Lock()
defer t.mutex.Unlock()
// init map for task ID if it does not eist
if t.coll[taskId] == nil {
t.coll[taskId] = make([]*TaskWatcher, 0)
if t.coll[taskID] == nil {
t.coll[taskID] = make([]*TaskWatcher, 0)
}
tw := &TaskWatcher{
// Assign unique ID to task watcher
id: t.tIdCounter,
id: t.tIDCounter,
// Add ref to coll for cleanup later
parent: t,
stopped: false,
handler: twh,
}
// Increment number for next time
t.tIdCounter++
t.tIDCounter++
// Add task id to task watcher list
tw.taskIds = append(tw.taskIds, taskId)
tw.taskIDs = append(tw.taskIDs, taskID)
// Add this task watcher in
t.coll[taskId] = append(t.coll[taskId], tw)
t.coll[taskID] = append(t.coll[taskID], tw)
watcherLog.WithFields(log.Fields{
"task-id": taskId,
"task-id": taskID,
"task-watcher-id": tw.id,
}).Debug("Added to task watcher collection")
return tw, nil
}

func (t *taskWatcherCollection) handleMetricCollected(taskId string, m []core.Metric) {
func (t *taskWatcherCollection) handleMetricCollected(taskID string, m []core.Metric) {
t.mutex.Lock()
defer t.mutex.Unlock()
// no taskID means no watches, early exit
if t.coll[taskId] == nil || len(t.coll[taskId]) == 0 {
if t.coll[taskID] == nil || len(t.coll[taskID]) == 0 {
// Uncomment this debug line if needed. Otherwise this is too verbose for even debug level.
// watcherLog.WithFields(log.Fields{
// "task-id": taskId,
// }).Debug("no watchers")
return
}
// Walk all watchers for a task ID
for i, v := range t.coll[taskId] {
for i, v := range t.coll[taskID] {
// Check if they have a catcher assigned
watcherLog.WithFields(log.Fields{
"task-id": taskId,
"task-id": taskID,
"task-watcher-id": i,
}).Debug("calling taskwatcher collection func")
// Call the catcher
v.handler.CatchCollection(m)
}
}

func (t *taskWatcherCollection) handleTaskStarted(taskId string) {
func (t *taskWatcherCollection) handleTaskStarted(taskID string) {
t.mutex.Lock()
defer t.mutex.Unlock()
// no taskID means no watches, early exit
if t.coll[taskId] == nil || len(t.coll[taskId]) == 0 {
if t.coll[taskID] == nil || len(t.coll[taskID]) == 0 {
// Uncomment this debug line if needed. Otherwise this is too verbose for even debug level.
// watcherLog.WithFields(log.Fields{
// "task-id": taskId,
// "task-id": taskID,
// }).Debug("no watchers")
return
}
// Walk all watchers for a task ID
for i, v := range t.coll[taskId] {
for i, v := range t.coll[taskID] {
// Check if they have a catcher assigned
watcherLog.WithFields(log.Fields{
"task-id": taskId,
"task-id": taskID,
"task-watcher-id": i,
}).Debug("calling taskwatcher task started func")
// Call the catcher
v.handler.CatchTaskStarted()
}
}

func (t *taskWatcherCollection) handleTaskStopped(taskId string) {
func (t *taskWatcherCollection) handleTaskStopped(taskID string) {
t.mutex.Lock()
defer t.mutex.Unlock()
// no taskID means no watches, early exit
if t.coll[taskId] == nil || len(t.coll[taskId]) == 0 {
if t.coll[taskID] == nil || len(t.coll[taskID]) == 0 {
// Uncomment this debug line if needed. Otherwise this is too verbose for even debug level.
// watcherLog.WithFields(log.Fields{
// "task-id": taskId,
// }).Debug("no watchers")
return
}
// Walk all watchers for a task ID
for i, v := range t.coll[taskId] {
for i, v := range t.coll[taskID] {
// Check if they have a catcher assigned
watcherLog.WithFields(log.Fields{
"task-id": taskId,
"task-id": taskID,
"task-watcher-id": i,
}).Debug("calling taskwatcher task stopped func")
// Call the catcher
v.handler.CatchTaskStopped()
}
}

func (t *taskWatcherCollection) handleTaskDisabled(taskId string, why string) {
func (t *taskWatcherCollection) handleTaskDisabled(taskID string, why string) {
t.mutex.Lock()
defer t.mutex.Unlock()
// no taskID means no watches, early exit
if t.coll[taskId] == nil || len(t.coll[taskId]) == 0 {
if t.coll[taskID] == nil || len(t.coll[taskID]) == 0 {
// Uncomment this debug line if needed. Otherwise this is too verbose for even debug level.
// watcherLog.WithFields(log.Fields{
// "task-id": taskId,
// "task-id": taskID,
// }).Debug("no watchers")
return
}
// Walk all watchers for a task ID
for i, v := range t.coll[taskId] {
for i, v := range t.coll[taskID] {
// Check if they have a catcher assigned
watcherLog.WithFields(log.Fields{
"task-id": taskId,
"task-id": taskID,
"task-watcher-id": i,
}).Debug("calling taskwatcher task disabled func")
// Call the catcher
Expand Down
21 changes: 20 additions & 1 deletion scheduler/work_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ const (

type workManagerOption func(w *workManager) workManagerOption

// CollectQSizeOption sets the collector queue size(length) and
// returns the previous queue option state.
func CollectQSizeOption(v uint) workManagerOption {
return func(w *workManager) workManagerOption {
previous := w.collectQSize
Expand All @@ -76,6 +78,8 @@ func CollectQSizeOption(v uint) workManagerOption {
}
}

// PublishQSizeOption sets the publisher queue size(length) and
// returns the previous queue option state.
func PublishQSizeOption(v uint) workManagerOption {
return func(w *workManager) workManagerOption {
previous := w.publishQSize
Expand All @@ -84,6 +88,8 @@ func PublishQSizeOption(v uint) workManagerOption {
}
}

// ProcessQSizeOption sets the processor queue size(length) and
// returns the previous queue option state.
func ProcessQSizeOption(v uint) workManagerOption {
return func(w *workManager) workManagerOption {
previous := w.processQSize
Expand All @@ -92,6 +98,8 @@ func ProcessQSizeOption(v uint) workManagerOption {
}
}

// CollectWkrSizeOption sets the collector worker pool size
// and returns the previous collector worker pool state.
func CollectWkrSizeOption(v uint) workManagerOption {
return func(w *workManager) workManagerOption {
previous := w.collectWkrSize
Expand All @@ -100,6 +108,8 @@ func CollectWkrSizeOption(v uint) workManagerOption {
}
}

// ProcessWkrSizeOption sets the processor worker pool size
// and return the previous processor worker pool state.
func ProcessWkrSizeOption(v uint) workManagerOption {
return func(w *workManager) workManagerOption {
previous := w.processWkrSize
Expand All @@ -108,6 +118,8 @@ func ProcessWkrSizeOption(v uint) workManagerOption {
}
}

// PublishWkrSizeOption sets the publisher worker pool size
// and returns the previous previous publisher worker pool state.
func PublishWkrSizeOption(v uint) workManagerOption {
return func(w *workManager) workManagerOption {
previous := w.publishWkrSize
Expand Down Expand Up @@ -164,7 +176,7 @@ func newWorkManager(opts ...workManagerOption) *workManager {
return wm
}

// workManager's loop just handles queuing errors.
// Start workManager's loop just handles queuing errors.
func (w *workManager) Start() {

w.mutex.Lock()
Expand Down Expand Up @@ -192,6 +204,7 @@ func (w *workManager) Start() {
}
}

// Stop closes the collector queue and worker
func (w *workManager) Stop() {
w.collectq.Stop()
close(workerKillChan)
Expand All @@ -214,20 +227,26 @@ func (w *workManager) Work(j job) job {
return j
}

// AddCollectWorker adds a new worker to
// the collector worker pool
func (w *workManager) AddCollectWorker() {
nw := newWorker(w.collectchan)
go nw.start()
w.collectWkrs = append(w.collectWkrs, nw)
w.collectWkrSize++
}

// AddPublishWorker adds a new worker to
// the publisher worker pool
func (w *workManager) AddPublishWorker() {
nw := newWorker(w.publishchan)
go nw.start()
w.publishWkrs = append(w.publishWkrs, nw)
w.publishWkrSize++
}

// AddProcessWorker adds a new worker to
// the processor worker pool
func (w *workManager) AddProcessWorker() {
nw := newWorker(w.processchan)
go nw.start()
Expand Down
Loading

0 comments on commit ca1614e

Please sign in to comment.