Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alertmanager: Replicate state using the Ring #3839

Merged
merged 3 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 0 additions & 113 deletions go.sum

Large diffs are not rendered by default.

60 changes: 40 additions & 20 deletions integration/alertmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import (
"testing"
"time"

amlabels "github.com/prometheus/alertmanager/pkg/labels"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
Expand Down Expand Up @@ -290,31 +293,48 @@ func TestAlertmanagerSharding(t *testing.T) {
require.NoError(t, s.StartAndWaitReady(am))
}

for _, am := range alertmanagers.Instances() {
require.NoError(t, am.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "alertmanager"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"),
)))

// We expect every instance to discover every configuration but only own a subset of them.
require.NoError(t, am.WaitSumMetrics(e2e.Equals(float64(30)), "cortex_alertmanager_tenants_discovered"))
// We know that the ring has settled when every instance has some tenants and the total number of tokens have been assigned.
require.NoError(t, am.WaitSumMetrics(e2e.Greater(float64(0)), "cortex_alertmanager_tenants_owned"))
require.NoError(t, am.WaitSumMetrics(e2e.Equals(float64(384)), "cortex_ring_tokens_total"))
require.NoError(t, alertmanagers.WaitSumMetricsWithOptions(e2e.Equals(9), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "alertmanager"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"),
)))

// We expect every instance to discover every configuration but only own a subset of them.
require.NoError(t, alertmanagers.WaitSumMetrics(e2e.Equals(90), "cortex_alertmanager_tenants_discovered"))
// We know that the ring has settled when every instance has some tenants and the total number of tokens have been assigned.
// The total number of tenants across all instances is: total alertmanager configs * replication factor.
// In this case: 30 * 2
require.NoError(t, alertmanagers.WaitSumMetrics(e2e.Equals(60), "cortex_alertmanager_tenants_owned"))
require.NoError(t, alertmanagers.WaitSumMetrics(e2e.Equals(float64(1152)), "cortex_ring_tokens_total"))

// Now, let's make sure state is replicated across instances.
// 1. Let's select a random tenant
userID := "user-5"

// 2. Let's create a silence
silence := types.Silence{
Matchers: amlabels.Matchers{
{Name: "instance", Value: "prometheus-one"},
},
Comment: "Created for a test case.",
StartsAt: time.Now(),
EndsAt: time.Now().Add(time.Hour),
}

var totalTenants int
for _, am := range alertmanagers.Instances() {
values, err := am.SumMetrics([]string{"cortex_alertmanager_tenants_owned"})
require.NoError(t, err)
// 2b. For each tenant, with a replication factor of 2 and 3 instances there's a chance the user might not be in the first selected replica.
c1, err := e2ecortex.NewClient("", "", alertmanager1.HTTPEndpoint(), "", userID)
require.NoError(t, err)
c2, err := e2ecortex.NewClient("", "", alertmanager2.HTTPEndpoint(), "", userID)
require.NoError(t, err)

tenants := int(e2e.SumValues(values))
totalTenants += tenants
err = c1.CreateSilence(context.Background(), silence)
if err != nil {
err := c2.CreateSilence(context.Background(), silence)
require.NoError(t, err)
} else {
require.NoError(t, err)
}

// The total number of tenants across all instances is: total alertmanager configs * replication factor.
// In this case: 30 * 2
require.Equal(t, 60, totalTenants)
assert.NoError(t, alertmanagers.WaitSumMetricsWithOptions(e2e.Equals(float64(2)), []string{"cortex_alertmanager_silences"}), e2e.WaitMissingMetrics)
})
}
}
25 changes: 25 additions & 0 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,31 @@ func (c *Client) SendAlertToAlermanager(ctx context.Context, alert *model.Alert)
return nil
}

func (c *Client) CreateSilence(ctx context.Context, silence types.Silence) error {
u := c.alertmanagerClient.URL("api/prom/api/v1/silences", nil)

data, err := json.Marshal(silence)
if err != nil {
return fmt.Errorf("error marshaling the silence: %s", err)
}

req, err := http.NewRequest(http.MethodPost, u.String(), bytes.NewReader(data))
if err != nil {
return fmt.Errorf("error creating request: %v", err)
}

resp, body, err := c.alertmanagerClient.Do(ctx, req)
if err != nil {
return err
}

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("creating the silence failed with status %d and error %v", resp.StatusCode, string(body))
}

return nil
}

func (c *Client) PostRequest(url string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest("POST", url, body)
if err != nil {
Expand Down
69 changes: 54 additions & 15 deletions pkg/alertmanager/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/alertmanager/api"
"github.com/prometheus/alertmanager/cluster"
"github.com/prometheus/alertmanager/cluster/clusterpb"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/inhibit"
Expand Down Expand Up @@ -53,13 +55,21 @@ type Config struct {
PeerTimeout time.Duration
Retention time.Duration
ExternalURL *url.URL

ShardingEnabled bool
ReplicationFactor int
ReplicateStateFunc func(context.Context, string, *clusterpb.Part) error
// The alertmanager replication protocol relies on a position related to other replicas.
// This position is then used to identify who should notify about the alert first.
GetPositionFunc func(userID string) int
}

// An Alertmanager manages the alerts for one user.
type Alertmanager struct {
cfg *Config
api *api.API
logger log.Logger
state State
nflog *nflog.Log
silences *silence.Silences
marker types.Marker
Expand Down Expand Up @@ -96,6 +106,13 @@ func init() {
}()
}

// State helps with replication and synchronization of notifications and silences across several alertmanager replicas.
type State interface {
AddState(string, cluster.State, prometheus.Registerer) cluster.ClusterChannel
Position() int
WaitReady()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WaitReady() is a blocking call, and should take context as argument so that caller can cancel/timeout waiting if needed. That also implies returning error to communicate success.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation for WaitReady and Settle are part of the next PR, if you don't mind I'd like to leave it out of this PR for now.

}

// New creates a new Alertmanager.
func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
am := &Alertmanager{
Expand All @@ -110,6 +127,22 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {

am.registry = reg

// We currently have 3 operational modes:
// 1) Alertmanager clustering with upstream Gossip
// 2) Alertmanager sharding and ring-based replication
// 3) Alertmanager no replication
// These are covered in order.
if cfg.Peer != nil {
level.Debug(am.logger).Log("msg", "starting tenant alertmanager with gossip-based replication")
am.state = cfg.Peer
} else if cfg.ShardingEnabled {
level.Debug(am.logger).Log("msg", "starting tenant alertmanager with ring-based replication")
am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.ReplicateStateFunc, cfg.GetPositionFunc, am.stop, am.logger, am.registry)
} else {
level.Debug(am.logger).Log("msg", "starting tenant alertmanager without replication")
am.state = &NilPeer{}
}

am.wg.Add(1)
nflogID := fmt.Sprintf("nflog:%s", cfg.UserID)
var err error
Expand All @@ -123,10 +156,9 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
if err != nil {
return nil, fmt.Errorf("failed to create notification log: %v", err)
}
if cfg.Peer != nil {
c := cfg.Peer.AddState("nfl:"+cfg.UserID, am.nflog, am.registry)
am.nflog.SetBroadcast(c.Broadcast)
}

c := am.state.AddState("nfl:"+cfg.UserID, am.nflog, am.registry)
am.nflog.SetBroadcast(c.Broadcast)

am.marker = types.NewMarker(am.registry)

Expand All @@ -140,10 +172,9 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
if err != nil {
return nil, fmt.Errorf("failed to create silences: %v", err)
}
if cfg.Peer != nil {
c := cfg.Peer.AddState("sil:"+cfg.UserID, am.silences, am.registry)
am.silences.SetBroadcast(c.Broadcast)
}

c = am.state.AddState("sil:"+cfg.UserID, am.silences, am.registry)
am.silences.SetBroadcast(c.Broadcast)

am.pipelineBuilder = notify.NewPipelineBuilder(am.registry)

Expand All @@ -162,9 +193,10 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
Alerts: am.alerts,
Silences: am.silences,
StatusFunc: am.marker.Status,
Peer: &NilPeer{},
Registry: am.registry,
Logger: log.With(am.logger, "component", "api"),
// Cortex should not expose cluster information back to its tenants.
Peer: &NilPeer{},
Registry: am.registry,
Logger: log.With(am.logger, "component", "api"),
GroupFunc: func(f1 func(*dispatch.Route) bool, f2 func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string) {
return am.dispatcher.Groups(f1, f2)
},
Expand All @@ -190,14 +222,16 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
}

am.dispatcherMetrics = dispatch.NewDispatcherMetrics(am.registry)

//TODO: From this point onward, the alertmanager _might_ receive requests - we need to make sure we've settled and are ready.
return am, nil
}

// clusterWait returns a function that inspects the current peer state and returns
// a duration of one base timeout for each peer with a higher ID than ourselves.
func clusterWait(p *cluster.Peer, timeout time.Duration) func() time.Duration {
func clusterWait(position func() int, timeout time.Duration) func() time.Duration {
return func() time.Duration {
return time.Duration(p.Position()) * timeout
return time.Duration(position()) * timeout
}
}

Expand Down Expand Up @@ -230,7 +264,8 @@ func (am *Alertmanager) ApplyConfig(userID string, conf *config.Config, rawCfg s

am.inhibitor = inhibit.NewInhibitor(am.alerts, conf.InhibitRules, am.marker, log.With(am.logger, "component", "inhibitor"))

waitFunc := clusterWait(am.cfg.Peer, am.cfg.PeerTimeout)
waitFunc := clusterWait(am.state.Position, am.cfg.PeerTimeout)

timeoutFunc := func(d time.Duration) time.Duration {
if d < notify.MinTimeout {
d = notify.MinTimeout
Expand All @@ -255,7 +290,7 @@ func (am *Alertmanager) ApplyConfig(userID string, conf *config.Config, rawCfg s
silence.NewSilencer(am.silences, am.marker, am.logger),
muteTimes,
am.nflog,
am.cfg.Peer,
am.state,
)
am.dispatcher = dispatch.NewDispatcher(
am.alerts,
Expand Down Expand Up @@ -293,6 +328,10 @@ func (am *Alertmanager) StopAndWait() {
am.wg.Wait()
}

func (am *Alertmanager) mergePartialExternalState(part *clusterpb.Part) error {
return am.state.(*state).MergePartialState(part)
}

// buildIntegrationsMap builds a map of name to the list of integration notifiers off of a
// list of receiver config.
func buildIntegrationsMap(nc []*config.Receiver, tmpl *template.Template, logger log.Logger) (map[string][]notify.Integration, error) {
Expand Down
32 changes: 31 additions & 1 deletion pkg/alertmanager/alertmanager_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ type alertmanagerMetrics struct {

// The alertmanager config hash.
configHashValue *prometheus.Desc

partialMerges *prometheus.Desc
partialMergesFailed *prometheus.Desc
replicationTotal *prometheus.Desc
replicationFailed *prometheus.Desc
}

func newAlertmanagerMetrics() *alertmanagerMetrics {
Expand Down Expand Up @@ -147,6 +152,22 @@ func newAlertmanagerMetrics() *alertmanagerMetrics {
"cortex_alertmanager_config_hash",
"Hash of the currently loaded alertmanager configuration.",
[]string{"user"}, nil),
partialMerges: prometheus.NewDesc(
"cortex_alertmanager_partial_state_merges_total",
"Number of times we have received a partial state to merge for a key.",
[]string{"key"}, nil),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By using key as label, we will have at least 2 labels per user, right? (one for notifications, one for silences). Do we need so many new metrics? Would it make sense to use "user" only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have user as part of these labels because key already includes the user. It's a combination of prefix + userID. So by using key we "technically get both". Even though it breaks the nomenclature (of always using user).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have per-user registries, so we could do per-user output, aggregating over all keys. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that different from the current approach? An aggregation across all keys is not possible if the keys are per user e.g. sil:user-3 or nfl:user-2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand it correctly, we have per-user registries, which only use "key" label. During alertmanagerMetrics.Collect, when then call SendSumOfCountersWithLabels with key label. We could instead call SendSumOfCountersPerUser (eg. data.SendSumOfCountersPerUser(out, m.partialMerges, "alertmanager_partial_state_merges_total")), which would return sum(alertmanager_partial_state_merges_total) per user-registry, and then add user label to the output. I think that would be enough. WDYT?

partialMergesFailed: prometheus.NewDesc(
"cortex_alertmanager_partial_state_merges_failed_total",
"Number of times we have failed to merge a partial state received for a key.",
[]string{"key"}, nil),
replicationTotal: prometheus.NewDesc(
"cortex_alertmanager_state_replication_total",
"Number of times we have tried to replicate a state to other alertmanagers",
[]string{"key"}, nil),
replicationFailed: prometheus.NewDesc(
"cortex_alertmanager_state_replication_failed_total",
"Number of times we have failed to replicate a state to other alertmanagers",
[]string{"key"}, nil),
}
}

Expand All @@ -155,7 +176,7 @@ func (m *alertmanagerMetrics) addUserRegistry(user string, reg *prometheus.Regis
}

func (m *alertmanagerMetrics) removeUserRegistry(user string) {
// We neeed to go for a soft deletion here, as hard deletion requires
// We need to go for a soft deletion here, as hard deletion requires
// that _all_ metrics except gauges are per-user.
m.regs.RemoveUserRegistry(user, false)
}
Expand Down Expand Up @@ -185,6 +206,10 @@ func (m *alertmanagerMetrics) Describe(out chan<- *prometheus.Desc) {
out <- m.silencesPropagatedMessagesTotal
out <- m.silences
out <- m.configHashValue
out <- m.partialMerges
out <- m.partialMergesFailed
out <- m.replicationTotal
out <- m.replicationFailed
}

func (m *alertmanagerMetrics) Collect(out chan<- prometheus.Metric) {
Expand Down Expand Up @@ -218,4 +243,9 @@ func (m *alertmanagerMetrics) Collect(out chan<- prometheus.Metric) {
data.SendSumOfGaugesPerUserWithLabels(out, m.silences, "alertmanager_silences", "state")

data.SendMaxOfGaugesPerUser(out, m.configHashValue, "alertmanager_config_hash")

data.SendSumOfCountersWithLabels(out, m.partialMerges, "alertmanager_partial_state_merges_total", "key")
data.SendSumOfCountersWithLabels(out, m.partialMergesFailed, "alertmanager_partial_state_merges_failed_total", "key")
data.SendSumOfCountersWithLabels(out, m.replicationTotal, "alertmanager_state_replication_total", "key")
data.SendSumOfCountersWithLabels(out, m.replicationFailed, "alertmanager_state_replication_failed_total", "key")
}
Loading