-
-
Notifications
You must be signed in to change notification settings - Fork 753
Monitoring and Alerting
Ken Hibino edited this page Jul 27, 2020
·
11 revisions
I recommend using a tool to monitor your worker processes in production to ensure they are always up and aren't using too much memory or CPU.
Asynq Handler
interface and ServeMux
can be instrumented with metrics tracking code.
Here is an example of using Prometheus to track important metrics. We can instrument our code to track additional application specific metrics as well as the default metrics (e.g. memory, cpu) tracked by prometheus.
Application specific metrics we are tracking in the example code below:
- Total Number of tasks processed (both successfully and failed)
- Number of failed tasks
- Number of tasks currently being processed.
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"runtime"
"github.com/hibiken/asynq"
"github.com/hibiken/asynq/examples/tasks"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/sys/unix"
)
// Metrics variables.
var (
processedCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "processed_tasks_total",
Help: "The total number of processed tasks",
},
[]string{"task_type"},
)
failedCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "failed_tasks_total",
Help: "The total number of times processing failed",
},
[]string{"task_type"},
)
inProgressGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "in_progress_tasks",
Help: "The number of tasks currently being processed",
},
[]string{"task_type"},
)
)
func metricsMiddleware(next asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
inProgressGauge.WithLabelValues(t.Type).Inc()
err := next.ProcessTask(ctx, t)
inProgressGauge.WithLabelValues(t.Type).Dec()
if err != nil {
failedCounter.WithLabelValues(t.Type).Inc()
}
processedCounter.WithLabelValues(t.Type).Inc()
return err
})
}
func main() {
httpServeMux := http.NewServeMux()
httpServeMux.Handle("/metrics", promhttp.Handler())
metricsSrv := &http.Server{
Addr: ":2112",
Handler: httpServeMux,
}
done := make(chan struct{})
// Start metrics server.
go func() {
err := metricsSrv.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
log.Printf("Error: metrics server error: %v", err)
}
close(done)
}()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{Concurrency: 20},
)
mux := asynq.NewServeMux()
mux.Use(metricsMiddleware)
mux.HandleFunc(tasks.TypeEmail, tasks.HandleEmailTask)
// Start worker server.
if err := srv.Start(mux); err != nil {
log.Fatalf("could not start server: %v", err)
}
// Wait for termination signal.
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
for {
s := <-sigs
if s == unix.SIGTSTP {
srv.Quiet() // Stop processing new tasks
continue
}
break
}
// Stop worker server.
srv.Stop()
// Stop metrics server.
if err := metricsSrv.Shutdown(context.Background()); err != nil {
log.Printf("Error: metrics server shutdown error: %v", err)
}
<-done
}