Skip to content

Commit

Permalink
Add more queue and task processing metrics (#3274)
Browse files Browse the repository at this point in the history
- Add task load latency 
- Add task schedule latency
- Add queue schedule latency 
- Add metrics for reader count, slice count, queue action
  • Loading branch information
yycptt authored Aug 31, 2022
1 parent 04013e5 commit 8788461
Show file tree
Hide file tree
Showing 45 changed files with 1,050 additions and 82 deletions.
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ const (
TaskCategoryTagName = "task_category"
TaskTypeTagName = "task_type"
TaskPriorityTagName = "task_priority"
QueueReaderIDTagName = "queue_reader_id"
QueueAlertTypeTagName = "queue_alert_type"
QueueTypeTagName = "queue_type"
visibilityTypeTagName = "visibility_type"
ErrorTypeTagName = "error_type"
Expand Down
2 changes: 2 additions & 0 deletions common/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination metrics_mock.go

package metrics

import (
Expand Down
306 changes: 306 additions & 0 deletions common/metrics/metrics_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,17 @@ func TaskPriorityTag(value string) Tag {
return &tagImpl{key: TaskPriorityTagName, value: value}
}

func QueueReaderIDTag(readerID int32) Tag {
return &tagImpl{key: QueueReaderIDTagName, value: strconv.Itoa(int(readerID))}
}

func QueueAlertTypeTag(value string) Tag {
if len(value) == 0 {
value = unknownValue
}
return &tagImpl{key: QueueAlertTypeTagName, value: value}
}

func QueueTypeTag(value string) Tag {
if len(value) == 0 {
value = unknownValue
Expand Down
14 changes: 11 additions & 3 deletions common/tasks/fifo_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type (
status int32
options *FIFOSchedulerOptions

logger log.Logger
monitor Monitor[T]
logger log.Logger

tasksChan chan T
shutdownChan chan struct{}
Expand All @@ -65,14 +66,16 @@ type (

// NewFIFOScheduler creates a new FIFOScheduler
func NewFIFOScheduler[T Task](
scheduleMoniter Monitor[T],
options *FIFOSchedulerOptions,
logger log.Logger,
) *FIFOScheduler[T] {
return &FIFOScheduler[T]{
status: common.DaemonStatusInitialized,
options: options,

logger: logger,
monitor: scheduleMoniter,
logger: logger,

tasksChan: make(chan T, options.QueueSize),
shutdownChan: make(chan struct{}),
Expand All @@ -88,6 +91,7 @@ func (f *FIFOScheduler[T]) Start() {
return
}

f.monitor.Start()
f.startWorkers(f.options.WorkerCount())

f.shutdownWG.Add(1)
Expand All @@ -109,6 +113,8 @@ func (f *FIFOScheduler[T]) Stop() {
// must be called after the close of the shutdownChan
f.drainTasks()

f.monitor.Stop()

go func() {
if success := common.AwaitWaitGroup(&f.shutdownWG, time.Minute); !success {
f.logger.Warn("fifo scheduler timed out waiting for workers")
Expand Down Expand Up @@ -211,8 +217,10 @@ func (f *FIFOScheduler[T]) processTask(
}

func (f *FIFOScheduler[T]) executeTask(
task Task,
task T,
) {
f.monitor.RecordStart(task)

operation := func() error {
if err := task.Execute(); err != nil {
return task.HandleErr(err)
Expand Down
Loading

0 comments on commit 8788461

Please sign in to comment.