From be0a8106ffd45f4677bc9e9c2fa3a86a6535af98 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Mon, 13 May 2024 11:16:26 +0100 Subject: [PATCH] #3513: Mark muted alerts (#3793) * Mark muted groups This commit updates TimeMuteStage and TimeActiveStage to mark groups as muted when its alerts are muted by an active or mute time interval, and remove any existing markers when outside all active and mute time intervals. Signed-off-by: George Robinson * Remove unlock to defer Signed-off-by: George Robinson --------- Signed-off-by: George Robinson --- cmd/alertmanager/main.go | 1 + dispatch/dispatch.go | 45 ++++++++++++++---------- dispatch/dispatch_test.go | 45 ++++++++++++++++++++++++ notify/notify.go | 64 +++++++++++++++++++++++++++++----- notify/notify_test.go | 73 ++++++++++++++++++++++++++++++++------- types/types.go | 9 +++++ types/types_test.go | 25 ++++++++++++++ 7 files changed, 222 insertions(+), 40 deletions(-) diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 93780b8edb..754b9b5c42 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -439,6 +439,7 @@ func run() int { inhibitor, silencer, intervener, + marker, notificationLog, pipelinePeer, ) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 822438e084..2468ee8f0e 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -78,6 +78,7 @@ type Dispatcher struct { route *Route alerts provider.Alerts stage notify.Stage + marker types.GroupMarker metrics *DispatcherMetrics limits Limits @@ -107,7 +108,7 @@ func NewDispatcher( ap provider.Alerts, r *Route, s notify.Stage, - mk types.AlertMarker, + mk types.GroupMarker, to func(time.Duration) time.Duration, lim Limits, l log.Logger, @@ -121,6 +122,7 @@ func NewDispatcher( alerts: ap, stage: s, route: r, + marker: mk, timeout: to, logger: log.With(l, "component", "dispatcher"), metrics: m, @@ -145,8 +147,8 @@ func (d *Dispatcher) Run() { } func (d *Dispatcher) run(it provider.AlertIterator) { - cleanup := time.NewTicker(30 * time.Second) - defer cleanup.Stop() + maintenance := time.NewTicker(30 * time.Second) + defer maintenance.Stop() defer it.Close() @@ -175,28 +177,30 @@ func (d *Dispatcher) run(it provider.AlertIterator) { } d.metrics.processingDuration.Observe(time.Since(now).Seconds()) - case <-cleanup.C: - d.mtx.Lock() - - for _, groups := range d.aggrGroupsPerRoute { - for _, ag := range groups { - if ag.empty() { - ag.stop() - delete(groups, ag.fingerprint()) - d.aggrGroupsNum-- - d.metrics.aggrGroups.Dec() - } - } - } - - d.mtx.Unlock() - + case <-maintenance.C: + d.doMaintenance() case <-d.ctx.Done(): return } } } +func (d *Dispatcher) doMaintenance() { + d.mtx.Lock() + defer d.mtx.Unlock() + for _, groups := range d.aggrGroupsPerRoute { + for _, ag := range groups { + if ag.empty() { + ag.stop() + d.marker.DeleteByGroupKey(ag.routeID, ag.GroupKey()) + delete(groups, ag.fingerprint()) + d.aggrGroupsNum-- + d.metrics.aggrGroups.Dec() + } + } + } +} + // AlertGroup represents how alerts exist within an aggrGroup. type AlertGroup struct { Alerts types.AlertSlice @@ -374,6 +378,7 @@ type aggrGroup struct { labels model.LabelSet opts *RouteOpts logger log.Logger + routeID string routeKey string alerts *store.Alerts @@ -394,6 +399,7 @@ func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func( } ag := &aggrGroup{ labels: labels, + routeID: r.ID(), routeKey: r.Key(), opts: &r.RouteOpts, timeout: to, @@ -447,6 +453,7 @@ func (ag *aggrGroup) run(nf notifyFunc) { ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval) ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals) ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals) + ctx = notify.WithRouteID(ctx, ag.routeID) // Wait the configured interval before calling flush again. ag.mtx.Lock() diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 0c8cbf7855..ad7d35b577 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -715,3 +715,48 @@ type limits struct { func (l limits) MaxNumberOfAggregationGroups() int { return l.groups } + +func TestDispatcher_DoMaintenance(t *testing.T) { + r := prometheus.NewRegistry() + marker := types.NewMarker(r) + + alerts, err := mem.NewAlerts(context.Background(), marker, time.Minute, nil, log.NewNopLogger(), nil) + if err != nil { + t.Fatal(err) + } + + route := &Route{ + RouteOpts: RouteOpts{ + GroupBy: map[model.LabelName]struct{}{"alertname": {}}, + GroupWait: 0, + GroupInterval: 5 * time.Minute, // Should never hit in this test. + }, + } + timeout := func(d time.Duration) time.Duration { return d } + recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} + + ctx := context.Background() + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, log.NewNopLogger(), NewDispatcherMetrics(false, r)) + aggrGroups := make(map[*Route]map[model.Fingerprint]*aggrGroup) + aggrGroups[route] = make(map[model.Fingerprint]*aggrGroup) + + // Insert an aggregation group with no alerts. + labels := model.LabelSet{"alertname": "1"} + aggrGroup1 := newAggrGroup(ctx, labels, route, timeout, log.NewNopLogger()) + aggrGroups[route][aggrGroup1.fingerprint()] = aggrGroup1 + dispatcher.aggrGroupsPerRoute = aggrGroups + // Must run otherwise doMaintenance blocks on aggrGroup1.stop(). + go aggrGroup1.run(func(context.Context, ...*types.Alert) bool { return true }) + + // Insert a marker for the aggregation group's group key. + marker.SetMuted(route.ID(), aggrGroup1.GroupKey(), []string{"weekends"}) + mutedBy, isMuted := marker.Muted(route.ID(), aggrGroup1.GroupKey()) + require.True(t, isMuted) + require.Equal(t, []string{"weekends"}, mutedBy) + + // Run the maintenance and the marker should be removed. + dispatcher.doMaintenance() + mutedBy, isMuted = marker.Muted(route.ID(), aggrGroup1.GroupKey()) + require.False(t, isMuted) + require.Empty(t, mutedBy) +} diff --git a/notify/notify.go b/notify/notify.go index 5eac12f40e..d1065ab793 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -119,6 +119,7 @@ const ( keyNow keyMuteTimeIntervals keyActiveTimeIntervals + keyRouteID ) // WithReceiverName populates a context with a receiver name. @@ -165,6 +166,10 @@ func WithActiveTimeIntervals(ctx context.Context, at []string) context.Context { return context.WithValue(ctx, keyActiveTimeIntervals, at) } +func WithRouteID(ctx context.Context, routeID string) context.Context { + return context.WithValue(ctx, keyRouteID, routeID) +} + // RepeatInterval extracts a repeat interval from the context. Iff none exists, the // second argument is false. func RepeatInterval(ctx context.Context) (time.Duration, bool) { @@ -228,6 +233,13 @@ func ActiveTimeIntervalNames(ctx context.Context) ([]string, bool) { return v, ok } +// RouteID extracts a RouteID from the context. Iff none exists, the +// // second argument is false. +func RouteID(ctx context.Context) (string, bool) { + v, ok := ctx.Value(keyRouteID).(string) + return v, ok +} + // A Stage processes alerts under the constraints of the given context. type Stage interface { Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) @@ -384,6 +396,7 @@ func (pb *PipelineBuilder) New( inhibitor *inhibit.Inhibitor, silencer *silence.Silencer, intervener *timeinterval.Intervener, + marker types.GroupMarker, notificationLog NotificationLog, peer Peer, ) RoutingStage { @@ -391,8 +404,8 @@ func (pb *PipelineBuilder) New( ms := NewGossipSettleStage(peer) is := NewMuteStage(inhibitor, pb.metrics) - tas := NewTimeActiveStage(intervener, pb.metrics) - tms := NewTimeMuteStage(intervener, pb.metrics) + tas := NewTimeActiveStage(intervener, marker, pb.metrics) + tms := NewTimeMuteStage(intervener, marker, pb.metrics) ss := NewMuteStage(silencer, pb.metrics) for name := range receivers { @@ -923,18 +936,29 @@ func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*typ type timeStage struct { muter types.TimeMuter + marker types.GroupMarker metrics *Metrics } type TimeMuteStage timeStage -func NewTimeMuteStage(m types.TimeMuter, metrics *Metrics) *TimeMuteStage { - return &TimeMuteStage{m, metrics} +func NewTimeMuteStage(muter types.TimeMuter, marker types.GroupMarker, metrics *Metrics) *TimeMuteStage { + return &TimeMuteStage{muter, marker, metrics} } // Exec implements the stage interface for TimeMuteStage. // TimeMuteStage is responsible for muting alerts whose route is not in an active time. func (tms TimeMuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { + routeID, ok := RouteID(ctx) + if !ok { + return ctx, nil, errors.New("route ID missing") + } + + gkey, ok := GroupKey(ctx) + if !ok { + return ctx, nil, errors.New("group key missing") + } + muteTimeIntervalNames, ok := MuteTimeIntervalNames(ctx) if !ok { return ctx, alerts, nil @@ -949,10 +973,12 @@ func (tms TimeMuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*type return ctx, alerts, nil } - muted, _, err := tms.muter.Mutes(muteTimeIntervalNames, now) + muted, mutedBy, err := tms.muter.Mutes(muteTimeIntervalNames, now) if err != nil { return ctx, alerts, err } + // If muted is false then mutedBy is nil and the muted marker is removed. + tms.marker.SetMuted(routeID, gkey, mutedBy) // If the current time is inside a mute time, all alerts are removed from the pipeline. if muted { @@ -960,18 +986,29 @@ func (tms TimeMuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*type level.Debug(l).Log("msg", "Notifications not sent, route is within mute time", "alerts", len(alerts)) return ctx, nil, nil } + return ctx, alerts, nil } type TimeActiveStage timeStage -func NewTimeActiveStage(m types.TimeMuter, metrics *Metrics) *TimeActiveStage { - return &TimeActiveStage{m, metrics} +func NewTimeActiveStage(muter types.TimeMuter, marker types.GroupMarker, metrics *Metrics) *TimeActiveStage { + return &TimeActiveStage{muter, marker, metrics} } // Exec implements the stage interface for TimeActiveStage. // TimeActiveStage is responsible for muting alerts whose route is not in an active time. func (tas TimeActiveStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { + routeID, ok := RouteID(ctx) + if !ok { + return ctx, nil, errors.New("route ID missing") + } + + gkey, ok := GroupKey(ctx) + if !ok { + return ctx, nil, errors.New("group key missing") + } + activeTimeIntervalNames, ok := ActiveTimeIntervalNames(ctx) if !ok { return ctx, alerts, nil @@ -987,13 +1024,22 @@ func (tas TimeActiveStage) Exec(ctx context.Context, l log.Logger, alerts ...*ty return ctx, alerts, errors.New("missing now timestamp") } - muted, _, err := tas.muter.Mutes(activeTimeIntervalNames, now) + active, _, err := tas.muter.Mutes(activeTimeIntervalNames, now) if err != nil { return ctx, alerts, err } + var mutedBy []string + if !active { + // If the group is muted, then it must be muted by all active time intervals. + // Otherwise, the group must be in at least one active time interval for it + // to be active. + mutedBy = activeTimeIntervalNames + } + tas.marker.SetMuted(routeID, gkey, mutedBy) + // If the current time is not inside an active time, all alerts are removed from the pipeline - if !muted { + if !active { tas.metrics.numNotificationSuppressedTotal.WithLabelValues(SuppressedReasonActiveTimeInterval).Add(float64(len(alerts))) level.Debug(l).Log("msg", "Notifications not sent, route is not within active time", "alerts", len(alerts)) return ctx, nil, nil diff --git a/notify/notify_test.go b/notify/notify_test.go index 5077c6d666..5c2297e993 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -19,6 +19,7 @@ import ( "fmt" "io" "reflect" + "sort" "strings" "testing" "time" @@ -827,12 +828,6 @@ func TestTimeMuteStage(t *testing.T) { } eveningsAndWeekends := map[string][]timeinterval.TimeInterval{ "evenings": {{ - Weekdays: []timeinterval.WeekdayRange{{ - InclusiveRange: timeinterval.InclusiveRange{ - Begin: 1, // Monday - End: 5, // Friday - }, - }}, Times: []timeinterval.TimeRange{{ StartMinute: 0, // 00:00 EndMinute: 540, // 09:00 @@ -877,30 +872,41 @@ func TestTimeMuteStage(t *testing.T) { alerts: []*types.Alert{{Alert: model.Alert{Labels: model.LabelSet{"foo": "bar"}}}}, mutedBy: []string{"weekends"}, }, { - name: "Should be muted at 12pm UTC", + name: "Should be muted at 12pm UTC on a weekday", intervals: eveningsAndWeekends, - now: time.Date(2024, 1, 6, 10, 0, 0, 0, time.UTC), + now: time.Date(2024, 1, 1, 10, 0, 0, 0, time.UTC), alerts: []*types.Alert{{Alert: model.Alert{Labels: model.LabelSet{"foo": "bar"}}}}, mutedBy: []string{"evenings"}, + }, { + name: "Should be muted at 12pm UTC on a weekend", + intervals: eveningsAndWeekends, + now: time.Date(2024, 1, 6, 10, 0, 0, 0, time.UTC), + alerts: []*types.Alert{{Alert: model.Alert{Labels: model.LabelSet{"foo": "bar"}}}}, + mutedBy: []string{"evenings", "weekends"}, }} for _, test := range tests { t.Run(test.name, func(t *testing.T) { r := prometheus.NewRegistry() + marker := types.NewMarker(r) metrics := NewMetrics(r, featurecontrol.NoopFlags{}) intervener := timeinterval.NewIntervener(test.intervals) - st := NewTimeMuteStage(intervener, metrics) + st := NewTimeMuteStage(intervener, marker, metrics) // Get the names of all time intervals for the context. muteTimeIntervalNames := make([]string, 0, len(test.intervals)) for name := range test.intervals { muteTimeIntervalNames = append(muteTimeIntervalNames, name) } + // Sort the names so we can compare mutedBy with test.mutedBy. + sort.Strings(muteTimeIntervalNames) ctx := context.Background() ctx = WithNow(ctx, test.now) + ctx = WithGroupKey(ctx, "group1") ctx = WithActiveTimeIntervals(ctx, nil) ctx = WithMuteTimeIntervals(ctx, muteTimeIntervalNames) + ctx = WithRouteID(ctx, "route1") _, active, err := st.Exec(ctx, log.NewNopLogger(), test.alerts...) require.NoError(t, err) @@ -908,14 +914,33 @@ func TestTimeMuteStage(t *testing.T) { if len(test.mutedBy) == 0 { // All alerts should be active. require.Equal(t, len(test.alerts), len(active)) + // The group should not be marked. + mutedBy, isMuted := marker.Muted("route1", "group1") + require.False(t, isMuted) + require.Empty(t, mutedBy) // The metric for total suppressed notifications should not // have been incremented, which means it will not be collected. - require.NoError(t, prom_testutil.GatherAndCompare(r, strings.NewReader(""))) + require.NoError(t, prom_testutil.GatherAndCompare(r, strings.NewReader(` +# HELP alertmanager_marked_alerts How many alerts by state are currently marked in the Alertmanager regardless of their expiry. +# TYPE alertmanager_marked_alerts gauge +alertmanager_marked_alerts{state="active"} 0 +alertmanager_marked_alerts{state="suppressed"} 0 +alertmanager_marked_alerts{state="unprocessed"} 0 +`))) } else { // All alerts should be muted. require.Empty(t, active) + // The group should be marked as muted. + mutedBy, isMuted := marker.Muted("route1", "group1") + require.True(t, isMuted) + require.Equal(t, test.mutedBy, mutedBy) // Gets the metric for total suppressed notifications. require.NoError(t, prom_testutil.GatherAndCompare(r, strings.NewReader(fmt.Sprintf(` +# HELP alertmanager_marked_alerts How many alerts by state are currently marked in the Alertmanager regardless of their expiry. +# TYPE alertmanager_marked_alerts gauge +alertmanager_marked_alerts{state="active"} 0 +alertmanager_marked_alerts{state="suppressed"} 0 +alertmanager_marked_alerts{state="unprocessed"} 0 # HELP alertmanager_notifications_suppressed_total The total number of notifications suppressed for being silenced, inhibited, outside of active time intervals or within muted time intervals. # TYPE alertmanager_notifications_suppressed_total counter alertmanager_notifications_suppressed_total{reason="mute_time_interval"} %d @@ -981,20 +1006,25 @@ func TestTimeActiveStage(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { r := prometheus.NewRegistry() + marker := types.NewMarker(r) metrics := NewMetrics(r, featurecontrol.NoopFlags{}) intervener := timeinterval.NewIntervener(test.intervals) - st := NewTimeActiveStage(intervener, metrics) + st := NewTimeActiveStage(intervener, marker, metrics) // Get the names of all time intervals for the context. activeTimeIntervalNames := make([]string, 0, len(test.intervals)) for name := range test.intervals { activeTimeIntervalNames = append(activeTimeIntervalNames, name) } + // Sort the names so we can compare mutedBy with test.mutedBy. + sort.Strings(activeTimeIntervalNames) ctx := context.Background() ctx = WithNow(ctx, test.now) + ctx = WithGroupKey(ctx, "group1") ctx = WithActiveTimeIntervals(ctx, activeTimeIntervalNames) ctx = WithMuteTimeIntervals(ctx, nil) + ctx = WithRouteID(ctx, "route1") _, active, err := st.Exec(ctx, log.NewNopLogger(), test.alerts...) require.NoError(t, err) @@ -1002,14 +1032,33 @@ func TestTimeActiveStage(t *testing.T) { if len(test.mutedBy) == 0 { // All alerts should be active. require.Equal(t, len(test.alerts), len(active)) + // The group should not be marked. + mutedBy, isMuted := marker.Muted("route1", "group1") + require.False(t, isMuted) + require.Empty(t, mutedBy) // The metric for total suppressed notifications should not // have been incremented, which means it will not be collected. - require.NoError(t, prom_testutil.GatherAndCompare(r, strings.NewReader(""))) + require.NoError(t, prom_testutil.GatherAndCompare(r, strings.NewReader(` +# HELP alertmanager_marked_alerts How many alerts by state are currently marked in the Alertmanager regardless of their expiry. +# TYPE alertmanager_marked_alerts gauge +alertmanager_marked_alerts{state="active"} 0 +alertmanager_marked_alerts{state="suppressed"} 0 +alertmanager_marked_alerts{state="unprocessed"} 0 +`))) } else { // All alerts should be muted. require.Empty(t, active) + // The group should be marked as muted. + mutedBy, isMuted := marker.Muted("route1", "group1") + require.True(t, isMuted) + require.Equal(t, test.mutedBy, mutedBy) // Gets the metric for total suppressed notifications. require.NoError(t, prom_testutil.GatherAndCompare(r, strings.NewReader(fmt.Sprintf(` +# HELP alertmanager_marked_alerts How many alerts by state are currently marked in the Alertmanager regardless of their expiry. +# TYPE alertmanager_marked_alerts gauge +alertmanager_marked_alerts{state="active"} 0 +alertmanager_marked_alerts{state="suppressed"} 0 +alertmanager_marked_alerts{state="unprocessed"} 0 # HELP alertmanager_notifications_suppressed_total The total number of notifications suppressed for being silenced, inhibited, outside of active time intervals or within muted time intervals. # TYPE alertmanager_notifications_suppressed_total counter alertmanager_notifications_suppressed_total{reason="active_time_interval"} %d diff --git a/types/types.go b/types/types.go index 648f0a7e01..85391ce91f 100644 --- a/types/types.go +++ b/types/types.go @@ -116,6 +116,9 @@ type GroupMarker interface { // intervals that mute it. If the list of names is nil or the empty slice // then the muted marker is removed. SetMuted(routeID, groupKey string, timeIntervalNames []string) + + // DeleteByGroupKey removes all markers for the GroupKey. + DeleteByGroupKey(routeID, groupKey string) } // NewMarker returns an instance of a AlertMarker implementation. @@ -158,6 +161,12 @@ func (m *MemMarker) SetMuted(routeID, groupKey string, timeIntervalNames []strin status.mutedBy = timeIntervalNames } +func (m *MemMarker) DeleteByGroupKey(routeID, groupKey string) { + m.mtx.Lock() + defer m.mtx.Unlock() + delete(m.groups, routeID+groupKey) +} + func (m *MemMarker) registerMetrics(r prometheus.Registerer) { newMarkedAlertMetricByState := func(st AlertState) prometheus.GaugeFunc { return prometheus.NewGaugeFunc( diff --git a/types/types_test.go b/types/types_test.go index 45bc59e2f0..198804862b 100644 --- a/types/types_test.go +++ b/types/types_test.go @@ -61,6 +61,31 @@ func TestMemMarker_Muted(t *testing.T) { require.Empty(t, timeIntervalNames) } +func TestMemMarker_DeleteByGroupKey(t *testing.T) { + r := prometheus.NewRegistry() + marker := NewMarker(r) + + // Mark the group and check that it is muted. + marker.SetMuted("route1", "group1", []string{"weekends"}) + timeIntervalNames, isMuted := marker.Muted("route1", "group1") + require.True(t, isMuted) + require.Equal(t, []string{"weekends"}, timeIntervalNames) + + // Delete the markers for a different group key. The group should + // still be muted. + marker.DeleteByGroupKey("route1", "group2") + timeIntervalNames, isMuted = marker.Muted("route1", "group1") + require.True(t, isMuted) + require.Equal(t, []string{"weekends"}, timeIntervalNames) + + // Delete the markers for the correct group key. The group should + // no longer be muted. + marker.DeleteByGroupKey("route1", "group1") + timeIntervalNames, isMuted = marker.Muted("route1", "group1") + require.False(t, isMuted) + require.Empty(t, timeIntervalNames) +} + func TestMemMarker_Count(t *testing.T) { r := prometheus.NewRegistry() marker := NewMarker(r)