-
-
Notifications
You must be signed in to change notification settings - Fork 755
Monitoring and Alerting
Ken Hibino edited this page Nov 18, 2021
·
11 revisions
I recommend using a tool to monitor your worker processes in production to ensure they are always up and aren't using too much memory or CPU.
Asynq Handler
interface and ServeMux
can be instrumented with metrics tracking code.
Here is an example of using Prometheus to track important metrics. We can instrument our code to track additional application specific metrics as well as the default metrics (e.g. memory, cpu) tracked by prometheus.
Application specific metrics we are tracking in the example code below:
- Total Number of tasks processed (both successfully and failed)
- Number of failed tasks
- Number of tasks currently being processed.
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"runtime"
"github.com/hibiken/asynq"
"github.com/hibiken/asynq/examples/tasks"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/sys/unix"
)
// Metrics variables.
var (
processedCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "processed_tasks_total",
Help: "The total number of processed tasks",
},
[]string{"task_type"},
)
failedCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "failed_tasks_total",
Help: "The total number of times processing failed",
},
[]string{"task_type"},
)
inProgressGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "in_progress_tasks",
Help: "The number of tasks currently being processed",
},
[]string{"task_type"},
)
)
func metricsMiddleware(next asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
inProgressGauge.WithLabelValues(t.Type()).Inc()
err := next.ProcessTask(ctx, t)
inProgressGauge.WithLabelValues(t.Type()).Dec()
if err != nil {
failedCounter.WithLabelValues(t.Type()).Inc()
}
processedCounter.WithLabelValues(t.Type()).Inc()
return err
})
}
func main() {
httpServeMux := http.NewServeMux()
httpServeMux.Handle("/metrics", promhttp.Handler())
metricsSrv := &http.Server{
Addr: ":2112",
Handler: httpServeMux,
}
done := make(chan struct{})
// Start metrics server.
go func() {
err := metricsSrv.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
log.Printf("Error: metrics server error: %v", err)
}
close(done)
}()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{Concurrency: 20},
)
mux := asynq.NewServeMux()
mux.Use(metricsMiddleware)
mux.HandleFunc(tasks.TypeEmail, tasks.HandleEmailTask)
// Start worker server.
if err := srv.Start(mux); err != nil {
log.Fatalf("Failed to start worker server: %v", err)
}
// Wait for termination signal.
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT)
<-sigs
// Stop worker server.
srv.Shutdown()
// Stop metrics server.
if err := metricsSrv.Shutdown(context.Background()); err != nil {
log.Printf("Error: metrics server shutdown error: %v", err)
}
<-done
}