-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#3513: Mark muted alerts #3793
#3513: Mark muted alerts #3793
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -438,6 +438,7 @@ func run() int { | |
inhibitor, | ||
silencer, | ||
intervener, | ||
marker, | ||
notificationLog, | ||
pipelinePeer, | ||
) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,6 +78,7 @@ type Dispatcher struct { | |
route *Route | ||
alerts provider.Alerts | ||
stage notify.Stage | ||
marker types.GroupMarker | ||
metrics *DispatcherMetrics | ||
limits Limits | ||
|
||
|
@@ -107,7 +108,7 @@ func NewDispatcher( | |
ap provider.Alerts, | ||
r *Route, | ||
s notify.Stage, | ||
mk types.AlertMarker, | ||
mk types.GroupMarker, | ||
to func(time.Duration) time.Duration, | ||
lim Limits, | ||
l log.Logger, | ||
|
@@ -121,6 +122,7 @@ func NewDispatcher( | |
alerts: ap, | ||
stage: s, | ||
route: r, | ||
marker: mk, | ||
timeout: to, | ||
logger: log.With(l, "component", "dispatcher"), | ||
metrics: m, | ||
|
@@ -145,8 +147,8 @@ func (d *Dispatcher) Run() { | |
} | ||
|
||
func (d *Dispatcher) run(it provider.AlertIterator) { | ||
cleanup := time.NewTicker(30 * time.Second) | ||
defer cleanup.Stop() | ||
maintenance := time.NewTicker(30 * time.Second) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I renamed this as all other "cleanup" functions in Alertmanager are called maintenance. |
||
defer maintenance.Stop() | ||
|
||
defer it.Close() | ||
|
||
|
@@ -175,28 +177,30 @@ func (d *Dispatcher) run(it provider.AlertIterator) { | |
} | ||
d.metrics.processingDuration.Observe(time.Since(now).Seconds()) | ||
|
||
case <-cleanup.C: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I moved this logic to a method called |
||
d.mtx.Lock() | ||
|
||
for _, groups := range d.aggrGroupsPerRoute { | ||
for _, ag := range groups { | ||
if ag.empty() { | ||
ag.stop() | ||
delete(groups, ag.fingerprint()) | ||
d.aggrGroupsNum-- | ||
d.metrics.aggrGroups.Dec() | ||
} | ||
} | ||
} | ||
|
||
d.mtx.Unlock() | ||
|
||
case <-maintenance.C: | ||
d.doMaintenance() | ||
case <-d.ctx.Done(): | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (d *Dispatcher) doMaintenance() { | ||
d.mtx.Lock() | ||
grobinson-grafana marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for _, groups := range d.aggrGroupsPerRoute { | ||
for _, ag := range groups { | ||
if ag.empty() { | ||
ag.stop() | ||
d.marker.DeleteByGroupKey(ag.routeID, ag.GroupKey()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We delete the marker for the aggregation group when the aggregation group is deleted itself. |
||
delete(groups, ag.fingerprint()) | ||
d.aggrGroupsNum-- | ||
d.metrics.aggrGroups.Dec() | ||
} | ||
} | ||
} | ||
d.mtx.Unlock() | ||
} | ||
|
||
// AlertGroup represents how alerts exist within an aggrGroup. | ||
type AlertGroup struct { | ||
Alerts types.AlertSlice | ||
|
@@ -374,6 +378,7 @@ type aggrGroup struct { | |
labels model.LabelSet | ||
opts *RouteOpts | ||
logger log.Logger | ||
routeID string | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We needed to add |
||
routeKey string | ||
|
||
alerts *store.Alerts | ||
|
@@ -394,6 +399,7 @@ func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func( | |
} | ||
ag := &aggrGroup{ | ||
labels: labels, | ||
routeID: r.ID(), | ||
routeKey: r.Key(), | ||
opts: &r.RouteOpts, | ||
timeout: to, | ||
|
@@ -447,6 +453,7 @@ func (ag *aggrGroup) run(nf notifyFunc) { | |
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval) | ||
ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals) | ||
ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals) | ||
ctx = notify.WithRouteID(ctx, ag.routeID) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We also need to add it to the context so it can be extracted in |
||
|
||
// Wait the configured interval before calling flush again. | ||
ag.mtx.Lock() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -691,3 +691,48 @@ type limits struct { | |
func (l limits) MaxNumberOfAggregationGroups() int { | ||
return l.groups | ||
} | ||
|
||
func TestDispatcher_DoMaintenance(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here I am testing that the marker is deleted when the aggregation group is garbage collected. |
||
r := prometheus.NewRegistry() | ||
marker := types.NewMarker(r) | ||
|
||
alerts, err := mem.NewAlerts(context.Background(), marker, time.Minute, nil, log.NewNopLogger(), nil) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
route := &Route{ | ||
RouteOpts: RouteOpts{ | ||
GroupBy: map[model.LabelName]struct{}{"alertname": {}}, | ||
GroupWait: 0, | ||
GroupInterval: 5 * time.Minute, // Should never hit in this test. | ||
}, | ||
} | ||
timeout := func(d time.Duration) time.Duration { return d } | ||
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} | ||
|
||
ctx := context.Background() | ||
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, log.NewNopLogger(), NewDispatcherMetrics(false, r)) | ||
aggrGroups := make(map[*Route]map[model.Fingerprint]*aggrGroup) | ||
aggrGroups[route] = make(map[model.Fingerprint]*aggrGroup) | ||
|
||
// Insert an aggregation group with no alerts. | ||
labels := model.LabelSet{"alertname": "1"} | ||
aggrGroup1 := newAggrGroup(ctx, labels, route, timeout, log.NewNopLogger()) | ||
aggrGroups[route][aggrGroup1.fingerprint()] = aggrGroup1 | ||
dispatcher.aggrGroupsPerRoute = aggrGroups | ||
// Must run otherwise doMaintenance blocks on aggrGroup1.stop(). | ||
go aggrGroup1.run(func(context.Context, ...*types.Alert) bool { return true }) | ||
|
||
// Insert a marker for the aggregation group's group key. | ||
marker.SetMuted(route.ID(), aggrGroup1.GroupKey(), []string{"weekends"}) | ||
mutedBy, isMuted := marker.Muted(route.ID(), aggrGroup1.GroupKey()) | ||
require.True(t, isMuted) | ||
require.Equal(t, []string{"weekends"}, mutedBy) | ||
|
||
// Run the maintenance and the marker should be removed. | ||
dispatcher.doMaintenance() | ||
mutedBy, isMuted = marker.Muted(route.ID(), aggrGroup1.GroupKey()) | ||
require.False(t, isMuted) | ||
require.Empty(t, mutedBy) | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -119,6 +119,7 @@ const ( | |||||||||
keyNow | ||||||||||
keyMuteTimeIntervals | ||||||||||
keyActiveTimeIntervals | ||||||||||
keyRouteID | ||||||||||
) | ||||||||||
|
||||||||||
// WithReceiverName populates a context with a receiver name. | ||||||||||
|
@@ -165,6 +166,10 @@ func WithActiveTimeIntervals(ctx context.Context, at []string) context.Context { | |||||||||
return context.WithValue(ctx, keyActiveTimeIntervals, at) | ||||||||||
} | ||||||||||
|
||||||||||
func WithRouteID(ctx context.Context, routeID string) context.Context { | ||||||||||
return context.WithValue(ctx, keyRouteID, routeID) | ||||||||||
} | ||||||||||
|
||||||||||
// RepeatInterval extracts a repeat interval from the context. Iff none exists, the | ||||||||||
// second argument is false. | ||||||||||
func RepeatInterval(ctx context.Context) (time.Duration, bool) { | ||||||||||
|
@@ -228,6 +233,13 @@ func ActiveTimeIntervalNames(ctx context.Context) ([]string, bool) { | |||||||||
return v, ok | ||||||||||
} | ||||||||||
|
||||||||||
// RouteID extracts a RouteID from the context. Iff none exists, the | ||||||||||
// // second argument is false. | ||||||||||
Comment on lines
+236
to
+237
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
The other comments have the same typo of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh iff is not a spelling mistake, it refers to if and only if. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Today I learned 🤯 |
||||||||||
func RouteID(ctx context.Context) (string, bool) { | ||||||||||
v, ok := ctx.Value(keyRouteID).(string) | ||||||||||
return v, ok | ||||||||||
} | ||||||||||
|
||||||||||
// A Stage processes alerts under the constraints of the given context. | ||||||||||
type Stage interface { | ||||||||||
Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) | ||||||||||
|
@@ -384,15 +396,16 @@ func (pb *PipelineBuilder) New( | |||||||||
inhibitor *inhibit.Inhibitor, | ||||||||||
silencer *silence.Silencer, | ||||||||||
intervener *timeinterval.Intervener, | ||||||||||
marker types.GroupMarker, | ||||||||||
notificationLog NotificationLog, | ||||||||||
peer Peer, | ||||||||||
) RoutingStage { | ||||||||||
rs := make(RoutingStage, len(receivers)) | ||||||||||
|
||||||||||
ms := NewGossipSettleStage(peer) | ||||||||||
is := NewMuteStage(inhibitor, pb.metrics) | ||||||||||
tas := NewTimeActiveStage(intervener, pb.metrics) | ||||||||||
tms := NewTimeMuteStage(intervener, pb.metrics) | ||||||||||
tas := NewTimeActiveStage(intervener, marker, pb.metrics) | ||||||||||
tms := NewTimeMuteStage(intervener, marker, pb.metrics) | ||||||||||
ss := NewMuteStage(silencer, pb.metrics) | ||||||||||
|
||||||||||
for name := range receivers { | ||||||||||
|
@@ -923,18 +936,29 @@ func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*typ | |||||||||
|
||||||||||
type timeStage struct { | ||||||||||
muter types.TimeMuter | ||||||||||
marker types.GroupMarker | ||||||||||
metrics *Metrics | ||||||||||
} | ||||||||||
|
||||||||||
type TimeMuteStage timeStage | ||||||||||
|
||||||||||
func NewTimeMuteStage(m types.TimeMuter, metrics *Metrics) *TimeMuteStage { | ||||||||||
return &TimeMuteStage{m, metrics} | ||||||||||
func NewTimeMuteStage(muter types.TimeMuter, marker types.GroupMarker, metrics *Metrics) *TimeMuteStage { | ||||||||||
return &TimeMuteStage{muter, marker, metrics} | ||||||||||
} | ||||||||||
|
||||||||||
// Exec implements the stage interface for TimeMuteStage. | ||||||||||
// TimeMuteStage is responsible for muting alerts whose route is not in an active time. | ||||||||||
func (tms TimeMuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { | ||||||||||
routeID, ok := RouteID(ctx) | ||||||||||
if !ok { | ||||||||||
return ctx, nil, errors.New("route ID missing") | ||||||||||
} | ||||||||||
|
||||||||||
gkey, ok := GroupKey(ctx) | ||||||||||
if !ok { | ||||||||||
return ctx, nil, errors.New("group key missing") | ||||||||||
} | ||||||||||
|
||||||||||
muteTimeIntervalNames, ok := MuteTimeIntervalNames(ctx) | ||||||||||
if !ok { | ||||||||||
return ctx, alerts, nil | ||||||||||
|
@@ -949,29 +973,42 @@ func (tms TimeMuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*type | |||||||||
return ctx, alerts, nil | ||||||||||
} | ||||||||||
|
||||||||||
muted, _, err := tms.muter.Mutes(muteTimeIntervalNames, now) | ||||||||||
muted, mutedBy, err := tms.muter.Mutes(muteTimeIntervalNames, now) | ||||||||||
if err != nil { | ||||||||||
return ctx, alerts, err | ||||||||||
} | ||||||||||
// If muted is false then mutedBy is nil and the muted marker is removed. | ||||||||||
tms.marker.SetMuted(routeID, gkey, mutedBy) | ||||||||||
|
||||||||||
// If the current time is inside a mute time, all alerts are removed from the pipeline. | ||||||||||
if muted { | ||||||||||
tms.metrics.numNotificationSuppressedTotal.WithLabelValues(SuppressedReasonMuteTimeInterval).Add(float64(len(alerts))) | ||||||||||
level.Debug(l).Log("msg", "Notifications not sent, route is within mute time", "alerts", len(alerts)) | ||||||||||
return ctx, nil, nil | ||||||||||
} | ||||||||||
|
||||||||||
return ctx, alerts, nil | ||||||||||
} | ||||||||||
|
||||||||||
type TimeActiveStage timeStage | ||||||||||
|
||||||||||
func NewTimeActiveStage(m types.TimeMuter, metrics *Metrics) *TimeActiveStage { | ||||||||||
return &TimeActiveStage{m, metrics} | ||||||||||
func NewTimeActiveStage(muter types.TimeMuter, marker types.GroupMarker, metrics *Metrics) *TimeActiveStage { | ||||||||||
return &TimeActiveStage{muter, marker, metrics} | ||||||||||
} | ||||||||||
|
||||||||||
// Exec implements the stage interface for TimeActiveStage. | ||||||||||
// TimeActiveStage is responsible for muting alerts whose route is not in an active time. | ||||||||||
func (tas TimeActiveStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { | ||||||||||
routeID, ok := RouteID(ctx) | ||||||||||
if !ok { | ||||||||||
return ctx, nil, errors.New("route ID missing") | ||||||||||
} | ||||||||||
|
||||||||||
gkey, ok := GroupKey(ctx) | ||||||||||
if !ok { | ||||||||||
return ctx, nil, errors.New("group key missing") | ||||||||||
} | ||||||||||
|
||||||||||
activeTimeIntervalNames, ok := ActiveTimeIntervalNames(ctx) | ||||||||||
if !ok { | ||||||||||
return ctx, alerts, nil | ||||||||||
|
@@ -987,13 +1024,22 @@ func (tas TimeActiveStage) Exec(ctx context.Context, l log.Logger, alerts ...*ty | |||||||||
return ctx, alerts, errors.New("missing now timestamp") | ||||||||||
} | ||||||||||
|
||||||||||
muted, _, err := tas.muter.Mutes(activeTimeIntervalNames, now) | ||||||||||
active, _, err := tas.muter.Mutes(activeTimeIntervalNames, now) | ||||||||||
if err != nil { | ||||||||||
return ctx, alerts, err | ||||||||||
} | ||||||||||
|
||||||||||
var mutedBy []string | ||||||||||
if !active { | ||||||||||
// If the group is muted, then it must be muted by all active time intervals. | ||||||||||
// Otherwise, the group must be in at least one active time interval for it | ||||||||||
// to be active. | ||||||||||
mutedBy = activeTimeIntervalNames | ||||||||||
} | ||||||||||
tas.marker.SetMuted(routeID, gkey, mutedBy) | ||||||||||
|
||||||||||
// If the current time is not inside an active time, all alerts are removed from the pipeline | ||||||||||
if !muted { | ||||||||||
if !active { | ||||||||||
tas.metrics.numNotificationSuppressedTotal.WithLabelValues(SuppressedReasonActiveTimeInterval).Add(float64(len(alerts))) | ||||||||||
level.Debug(l).Log("msg", "Notifications not sent, route is not within active time", "alerts", len(alerts)) | ||||||||||
return ctx, nil, nil | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mk
was never used, here I am replacing it withtypes.GroupMarker
.