Skip to content

Monitoring and Alerting

Ken Hibino edited this page Sep 15, 2020 · 11 revisions

I recommend using a tool to monitor your worker processes in production to ensure they are always up and aren't using too much memory or CPU.

Asynq Handler interface and ServeMux can be instrumented with metrics tracking code.

Here is an example of using Prometheus to track important metrics. We can instrument our code to track additional application specific metrics as well as the default metrics (e.g. memory, cpu) tracked by prometheus.

Application specific metrics we are tracking in the example code below:

  • Total Number of tasks processed (both successfully and failed)
  • Number of failed tasks
  • Number of tasks currently being processed.
package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "runtime"

    "github.com/hibiken/asynq"
    "github.com/hibiken/asynq/examples/tasks"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "golang.org/x/sys/unix"
)

// Metrics variables.
var (
    processedCounter = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "processed_tasks_total",
            Help: "The total number of processed tasks",
        },
        []string{"task_type"},
    )

    failedCounter = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "failed_tasks_total",
	    Help: "The total number of times processing failed",
	},
        []string{"task_type"},
    )

    inProgressGauge = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
	    Name: "in_progress_tasks",
	    Help: "The number of tasks currently being processed",
	},
        []string{"task_type"},
    )
)

func metricsMiddleware(next asynq.Handler) asynq.Handler {
    return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
        inProgressGauge.WithLabelValues(t.Type).Inc()
        err := next.ProcessTask(ctx, t)
        inProgressGauge.WithLabelValues(t.Type).Dec()
	if err != nil {
	    failedCounter.WithLabelValues(t.Type).Inc()
	}
	processedCounter.WithLabelValues(t.Type).Inc()
	return err
    })
}

func main() {
    httpServeMux := http.NewServeMux()
    httpServeMux.Handle("/metrics", promhttp.Handler())
    metricsSrv := &http.Server{
        Addr:    ":2112",
	Handler: httpServeMux,
    }
    done := make(chan struct{})

    // Start metrics server.
    go func() {
        err := metricsSrv.ListenAndServe()
	if err != nil && err != http.ErrServerClosed {
	    log.Printf("Error: metrics server error: %v", err)
	}
	close(done)
    }()

    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: ":6379"},
	asynq.Config{Concurrency: 20},
    )

    mux := asynq.NewServeMux()
    mux.Use(metricsMiddleware)
    mux.HandleFunc(tasks.TypeEmail, tasks.HandleEmailTask)

    // Start worker server.
    if err := srv.Start(mux); err != nil {
        log.Fatalf("could not start server: %v", err)
    }

    // Wait for termination signal.
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
    for {
        s := <-sigs
        if s == unix.SIGTSTP {
	    srv.Quiet() // Stop processing new tasks
	    continue
	}
	break
    }
	
    // Stop worker server.
    srv.Stop()
	
    // Stop metrics server.
    if err := metricsSrv.Shutdown(context.Background()); err != nil {
        log.Printf("Error: metrics server shutdown error: %v", err)
    }
    <-done
}

You can also monitor queues using the Inspector.
Here's an example of a simple HTTP server that exports queue metrics data to prometheus.

package main

import (
    "log"
    "net/http"
    "time"

    "github.com/hibiken/asynq"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
    "github.com/prometheus/client_golang/prometheus/promhttp"
)

func recordMetrics() {
    inspector := asynq.NewInspector(asynq.RedisClientOpt{Addr: ":6379"})
    go func() {
        for {
            time.Sleep(2 * time.Second)
            // Get current stats of the "email" queue.
            stats, err := inspector.CurrentStats("email")
            if err != nil {
                log.Printf("Error: Could not get current stat from email queue: %v", err)
                continue
            }
            emailQueueActiveTasks.Set(float64(stats.Active))
            emailQueuePendingTasks.Set(float64(stats.Pending))
            emailQueueScheduledTasks.Set(float64(stats.Scheduled))
            emailQueueRetryTasks.Set(float64(stats.Retry))
            emailQueueDeadTasks.Set(float64(stats.Dead))
        }
    }()
}

var (
    emailQueueActiveTasks = promauto.NewGauge(prometheus.GaugeOpts{
        Name: "email_queue_active_tasks",
        Help: "The number of active tasks in email queue",
    })

    // This gauge indicates the size of the queue backlog.
    // You should set up alerts, that’ll notify you as soon as 
    // this number has reached an unacceptable size. 
    emailQueuePendingTasks = promauto.NewGauge(prometheus.GaugeOpts{
        Name: "email_queue_pending_tasks",
        Help: "The number of pending tasks in email queue",
    })

    emailQueueScheduledTasks = promauto.NewGauge(prometheus.GaugeOpts{
        Name: "email_queue_scheduled_tasks",
        Help: "The number of scheduled tasks in email queue",
    })

    emailQueueRetryTasks = promauto.NewGauge(prometheus.GaugeOpts{
        Name: "email_queue_retry_tasks",
        Help: "The number of retry tasks in email queue",
    })

    emailQueueDeadTasks = promauto.NewGauge(prometheus.GaugeOpts{
        Name: "email_queue_dead_tasks",
        Help: "The number of dead tasks in email queue",
    })
)

func main() {
    recordMetrics()

    http.Handle("/metrics", promhttp.Handler())
    http.ListenAndServe(":2113", nil)
}