Skip to content

Commit

Permalink
prometheus#3513: Mark muted alerts (prometheus#3793)
Browse files Browse the repository at this point in the history
* Mark muted groups

This commit updates TimeMuteStage and TimeActiveStage to mark groups
as muted when its alerts are muted by an active or mute time interval,
and remove any existing markers when outside all active and mute
time intervals.

Signed-off-by: George Robinson <george.robinson@grafana.com>

* Remove unlock to defer

Signed-off-by: George Robinson <george.robinson@grafana.com>

---------

Signed-off-by: George Robinson <george.robinson@grafana.com>
  • Loading branch information
grobinson-grafana authored and TheMeier committed Sep 29, 2024
1 parent 5d2b0d8 commit be0a810
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 40 deletions.
1 change: 1 addition & 0 deletions cmd/alertmanager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ func run() int {
inhibitor,
silencer,
intervener,
marker,
notificationLog,
pipelinePeer,
)
Expand Down
45 changes: 26 additions & 19 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type Dispatcher struct {
route *Route
alerts provider.Alerts
stage notify.Stage
marker types.GroupMarker
metrics *DispatcherMetrics
limits Limits

Expand Down Expand Up @@ -107,7 +108,7 @@ func NewDispatcher(
ap provider.Alerts,
r *Route,
s notify.Stage,
mk types.AlertMarker,
mk types.GroupMarker,
to func(time.Duration) time.Duration,
lim Limits,
l log.Logger,
Expand All @@ -121,6 +122,7 @@ func NewDispatcher(
alerts: ap,
stage: s,
route: r,
marker: mk,
timeout: to,
logger: log.With(l, "component", "dispatcher"),
metrics: m,
Expand All @@ -145,8 +147,8 @@ func (d *Dispatcher) Run() {
}

func (d *Dispatcher) run(it provider.AlertIterator) {
cleanup := time.NewTicker(30 * time.Second)
defer cleanup.Stop()
maintenance := time.NewTicker(30 * time.Second)
defer maintenance.Stop()

defer it.Close()

Expand Down Expand Up @@ -175,28 +177,30 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
}
d.metrics.processingDuration.Observe(time.Since(now).Seconds())

case <-cleanup.C:
d.mtx.Lock()

for _, groups := range d.aggrGroupsPerRoute {
for _, ag := range groups {
if ag.empty() {
ag.stop()
delete(groups, ag.fingerprint())
d.aggrGroupsNum--
d.metrics.aggrGroups.Dec()
}
}
}

d.mtx.Unlock()

case <-maintenance.C:
d.doMaintenance()
case <-d.ctx.Done():
return
}
}
}

func (d *Dispatcher) doMaintenance() {
d.mtx.Lock()
defer d.mtx.Unlock()
for _, groups := range d.aggrGroupsPerRoute {
for _, ag := range groups {
if ag.empty() {
ag.stop()
d.marker.DeleteByGroupKey(ag.routeID, ag.GroupKey())
delete(groups, ag.fingerprint())
d.aggrGroupsNum--
d.metrics.aggrGroups.Dec()
}
}
}
}

// AlertGroup represents how alerts exist within an aggrGroup.
type AlertGroup struct {
Alerts types.AlertSlice
Expand Down Expand Up @@ -374,6 +378,7 @@ type aggrGroup struct {
labels model.LabelSet
opts *RouteOpts
logger log.Logger
routeID string
routeKey string

alerts *store.Alerts
Expand All @@ -394,6 +399,7 @@ func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(
}
ag := &aggrGroup{
labels: labels,
routeID: r.ID(),
routeKey: r.Key(),
opts: &r.RouteOpts,
timeout: to,
Expand Down Expand Up @@ -447,6 +453,7 @@ func (ag *aggrGroup) run(nf notifyFunc) {
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals)
ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals)
ctx = notify.WithRouteID(ctx, ag.routeID)

// Wait the configured interval before calling flush again.
ag.mtx.Lock()
Expand Down
45 changes: 45 additions & 0 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,3 +715,48 @@ type limits struct {
func (l limits) MaxNumberOfAggregationGroups() int {
return l.groups
}

func TestDispatcher_DoMaintenance(t *testing.T) {
r := prometheus.NewRegistry()
marker := types.NewMarker(r)

alerts, err := mem.NewAlerts(context.Background(), marker, time.Minute, nil, log.NewNopLogger(), nil)
if err != nil {
t.Fatal(err)
}

route := &Route{
RouteOpts: RouteOpts{
GroupBy: map[model.LabelName]struct{}{"alertname": {}},
GroupWait: 0,
GroupInterval: 5 * time.Minute, // Should never hit in this test.
},
}
timeout := func(d time.Duration) time.Duration { return d }
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}

ctx := context.Background()
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, log.NewNopLogger(), NewDispatcherMetrics(false, r))
aggrGroups := make(map[*Route]map[model.Fingerprint]*aggrGroup)
aggrGroups[route] = make(map[model.Fingerprint]*aggrGroup)

// Insert an aggregation group with no alerts.
labels := model.LabelSet{"alertname": "1"}
aggrGroup1 := newAggrGroup(ctx, labels, route, timeout, log.NewNopLogger())
aggrGroups[route][aggrGroup1.fingerprint()] = aggrGroup1
dispatcher.aggrGroupsPerRoute = aggrGroups
// Must run otherwise doMaintenance blocks on aggrGroup1.stop().
go aggrGroup1.run(func(context.Context, ...*types.Alert) bool { return true })

// Insert a marker for the aggregation group's group key.
marker.SetMuted(route.ID(), aggrGroup1.GroupKey(), []string{"weekends"})
mutedBy, isMuted := marker.Muted(route.ID(), aggrGroup1.GroupKey())
require.True(t, isMuted)
require.Equal(t, []string{"weekends"}, mutedBy)

// Run the maintenance and the marker should be removed.
dispatcher.doMaintenance()
mutedBy, isMuted = marker.Muted(route.ID(), aggrGroup1.GroupKey())
require.False(t, isMuted)
require.Empty(t, mutedBy)
}
64 changes: 55 additions & 9 deletions notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ const (
keyNow
keyMuteTimeIntervals
keyActiveTimeIntervals
keyRouteID
)

// WithReceiverName populates a context with a receiver name.
Expand Down Expand Up @@ -165,6 +166,10 @@ func WithActiveTimeIntervals(ctx context.Context, at []string) context.Context {
return context.WithValue(ctx, keyActiveTimeIntervals, at)
}

func WithRouteID(ctx context.Context, routeID string) context.Context {
return context.WithValue(ctx, keyRouteID, routeID)
}

// RepeatInterval extracts a repeat interval from the context. Iff none exists, the
// second argument is false.
func RepeatInterval(ctx context.Context) (time.Duration, bool) {
Expand Down Expand Up @@ -228,6 +233,13 @@ func ActiveTimeIntervalNames(ctx context.Context) ([]string, bool) {
return v, ok
}

// RouteID extracts a RouteID from the context. Iff none exists, the
// // second argument is false.
func RouteID(ctx context.Context) (string, bool) {
v, ok := ctx.Value(keyRouteID).(string)
return v, ok
}

// A Stage processes alerts under the constraints of the given context.
type Stage interface {
Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
Expand Down Expand Up @@ -384,15 +396,16 @@ func (pb *PipelineBuilder) New(
inhibitor *inhibit.Inhibitor,
silencer *silence.Silencer,
intervener *timeinterval.Intervener,
marker types.GroupMarker,
notificationLog NotificationLog,
peer Peer,
) RoutingStage {
rs := make(RoutingStage, len(receivers))

ms := NewGossipSettleStage(peer)
is := NewMuteStage(inhibitor, pb.metrics)
tas := NewTimeActiveStage(intervener, pb.metrics)
tms := NewTimeMuteStage(intervener, pb.metrics)
tas := NewTimeActiveStage(intervener, marker, pb.metrics)
tms := NewTimeMuteStage(intervener, marker, pb.metrics)
ss := NewMuteStage(silencer, pb.metrics)

for name := range receivers {
Expand Down Expand Up @@ -923,18 +936,29 @@ func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*typ

type timeStage struct {
muter types.TimeMuter
marker types.GroupMarker
metrics *Metrics
}

type TimeMuteStage timeStage

func NewTimeMuteStage(m types.TimeMuter, metrics *Metrics) *TimeMuteStage {
return &TimeMuteStage{m, metrics}
func NewTimeMuteStage(muter types.TimeMuter, marker types.GroupMarker, metrics *Metrics) *TimeMuteStage {
return &TimeMuteStage{muter, marker, metrics}
}

// Exec implements the stage interface for TimeMuteStage.
// TimeMuteStage is responsible for muting alerts whose route is not in an active time.
func (tms TimeMuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
routeID, ok := RouteID(ctx)
if !ok {
return ctx, nil, errors.New("route ID missing")
}

gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, errors.New("group key missing")
}

muteTimeIntervalNames, ok := MuteTimeIntervalNames(ctx)
if !ok {
return ctx, alerts, nil
Expand All @@ -949,29 +973,42 @@ func (tms TimeMuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*type
return ctx, alerts, nil
}

muted, _, err := tms.muter.Mutes(muteTimeIntervalNames, now)
muted, mutedBy, err := tms.muter.Mutes(muteTimeIntervalNames, now)
if err != nil {
return ctx, alerts, err
}
// If muted is false then mutedBy is nil and the muted marker is removed.
tms.marker.SetMuted(routeID, gkey, mutedBy)

// If the current time is inside a mute time, all alerts are removed from the pipeline.
if muted {
tms.metrics.numNotificationSuppressedTotal.WithLabelValues(SuppressedReasonMuteTimeInterval).Add(float64(len(alerts)))
level.Debug(l).Log("msg", "Notifications not sent, route is within mute time", "alerts", len(alerts))
return ctx, nil, nil
}

return ctx, alerts, nil
}

type TimeActiveStage timeStage

func NewTimeActiveStage(m types.TimeMuter, metrics *Metrics) *TimeActiveStage {
return &TimeActiveStage{m, metrics}
func NewTimeActiveStage(muter types.TimeMuter, marker types.GroupMarker, metrics *Metrics) *TimeActiveStage {
return &TimeActiveStage{muter, marker, metrics}
}

// Exec implements the stage interface for TimeActiveStage.
// TimeActiveStage is responsible for muting alerts whose route is not in an active time.
func (tas TimeActiveStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
routeID, ok := RouteID(ctx)
if !ok {
return ctx, nil, errors.New("route ID missing")
}

gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, errors.New("group key missing")
}

activeTimeIntervalNames, ok := ActiveTimeIntervalNames(ctx)
if !ok {
return ctx, alerts, nil
Expand All @@ -987,13 +1024,22 @@ func (tas TimeActiveStage) Exec(ctx context.Context, l log.Logger, alerts ...*ty
return ctx, alerts, errors.New("missing now timestamp")
}

muted, _, err := tas.muter.Mutes(activeTimeIntervalNames, now)
active, _, err := tas.muter.Mutes(activeTimeIntervalNames, now)
if err != nil {
return ctx, alerts, err
}

var mutedBy []string
if !active {
// If the group is muted, then it must be muted by all active time intervals.
// Otherwise, the group must be in at least one active time interval for it
// to be active.
mutedBy = activeTimeIntervalNames
}
tas.marker.SetMuted(routeID, gkey, mutedBy)

// If the current time is not inside an active time, all alerts are removed from the pipeline
if !muted {
if !active {
tas.metrics.numNotificationSuppressedTotal.WithLabelValues(SuppressedReasonActiveTimeInterval).Add(float64(len(alerts)))
level.Debug(l).Log("msg", "Notifications not sent, route is not within active time", "alerts", len(alerts))
return ctx, nil, nil
Expand Down
Loading

0 comments on commit be0a810

Please sign in to comment.