Skip to content

Commit

Permalink
Alertmanager: Replicate state using the Ring (cortexproject#3839)
Browse files Browse the repository at this point in the history
* Alertmanager: Replicate state using the Ring

Alertmanager typically uses the memberlist gossip based protocol to
replcate state across replicas. In cortex, we used the same fundamentals
to provide some sort of high availability mode.

Now that we have support for sharding instances across many machines, we
can leverage the ring to find the corresponding instances and send the
updates via gRPC.

Signed-off-by: gotjosh <josue@grafana.com>

* Appease the linter and wordsmithing

Signed-off-by: gotjosh <josue@grafana.com>

* Always wait for the missing metrics

Signed-off-by: gotjosh <josue@grafana.com>
  • Loading branch information
gotjosh authored and harry671003 committed Mar 11, 2021
1 parent 76acaae commit d27b2d0
Show file tree
Hide file tree
Showing 11 changed files with 1,305 additions and 182 deletions.
113 changes: 0 additions & 113 deletions go.sum

Large diffs are not rendered by default.

60 changes: 40 additions & 20 deletions integration/alertmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import (
"testing"
"time"

amlabels "github.com/prometheus/alertmanager/pkg/labels"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
Expand Down Expand Up @@ -290,31 +293,48 @@ func TestAlertmanagerSharding(t *testing.T) {
require.NoError(t, s.StartAndWaitReady(am))
}

for _, am := range alertmanagers.Instances() {
require.NoError(t, am.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "alertmanager"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"),
)))

// We expect every instance to discover every configuration but only own a subset of them.
require.NoError(t, am.WaitSumMetrics(e2e.Equals(float64(30)), "cortex_alertmanager_tenants_discovered"))
// We know that the ring has settled when every instance has some tenants and the total number of tokens have been assigned.
require.NoError(t, am.WaitSumMetrics(e2e.Greater(float64(0)), "cortex_alertmanager_tenants_owned"))
require.NoError(t, am.WaitSumMetrics(e2e.Equals(float64(384)), "cortex_ring_tokens_total"))
require.NoError(t, alertmanagers.WaitSumMetricsWithOptions(e2e.Equals(9), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "alertmanager"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"),
)))

// We expect every instance to discover every configuration but only own a subset of them.
require.NoError(t, alertmanagers.WaitSumMetrics(e2e.Equals(90), "cortex_alertmanager_tenants_discovered"))
// We know that the ring has settled when every instance has some tenants and the total number of tokens have been assigned.
// The total number of tenants across all instances is: total alertmanager configs * replication factor.
// In this case: 30 * 2
require.NoError(t, alertmanagers.WaitSumMetrics(e2e.Equals(60), "cortex_alertmanager_tenants_owned"))
require.NoError(t, alertmanagers.WaitSumMetrics(e2e.Equals(float64(1152)), "cortex_ring_tokens_total"))

// Now, let's make sure state is replicated across instances.
// 1. Let's select a random tenant
userID := "user-5"

// 2. Let's create a silence
silence := types.Silence{
Matchers: amlabels.Matchers{
{Name: "instance", Value: "prometheus-one"},
},
Comment: "Created for a test case.",
StartsAt: time.Now(),
EndsAt: time.Now().Add(time.Hour),
}

var totalTenants int
for _, am := range alertmanagers.Instances() {
values, err := am.SumMetrics([]string{"cortex_alertmanager_tenants_owned"})
require.NoError(t, err)
// 2b. For each tenant, with a replication factor of 2 and 3 instances there's a chance the user might not be in the first selected replica.
c1, err := e2ecortex.NewClient("", "", alertmanager1.HTTPEndpoint(), "", userID)
require.NoError(t, err)
c2, err := e2ecortex.NewClient("", "", alertmanager2.HTTPEndpoint(), "", userID)
require.NoError(t, err)

tenants := int(e2e.SumValues(values))
totalTenants += tenants
err = c1.CreateSilence(context.Background(), silence)
if err != nil {
err := c2.CreateSilence(context.Background(), silence)
require.NoError(t, err)
} else {
require.NoError(t, err)
}

// The total number of tenants across all instances is: total alertmanager configs * replication factor.
// In this case: 30 * 2
require.Equal(t, 60, totalTenants)
assert.NoError(t, alertmanagers.WaitSumMetricsWithOptions(e2e.Equals(float64(2)), []string{"cortex_alertmanager_silences"}), e2e.WaitMissingMetrics)
})
}
}
25 changes: 25 additions & 0 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,31 @@ func (c *Client) SendAlertToAlermanager(ctx context.Context, alert *model.Alert)
return nil
}

func (c *Client) CreateSilence(ctx context.Context, silence types.Silence) error {
u := c.alertmanagerClient.URL("api/prom/api/v1/silences", nil)

data, err := json.Marshal(silence)
if err != nil {
return fmt.Errorf("error marshaling the silence: %s", err)
}

req, err := http.NewRequest(http.MethodPost, u.String(), bytes.NewReader(data))
if err != nil {
return fmt.Errorf("error creating request: %v", err)
}

resp, body, err := c.alertmanagerClient.Do(ctx, req)
if err != nil {
return err
}

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("creating the silence failed with status %d and error %v", resp.StatusCode, string(body))
}

return nil
}

func (c *Client) PostRequest(url string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest("POST", url, body)
if err != nil {
Expand Down
69 changes: 54 additions & 15 deletions pkg/alertmanager/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/alertmanager/api"
"github.com/prometheus/alertmanager/cluster"
"github.com/prometheus/alertmanager/cluster/clusterpb"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/inhibit"
Expand Down Expand Up @@ -53,13 +55,21 @@ type Config struct {
PeerTimeout time.Duration
Retention time.Duration
ExternalURL *url.URL

ShardingEnabled bool
ReplicationFactor int
ReplicateStateFunc func(context.Context, string, *clusterpb.Part) error
// The alertmanager replication protocol relies on a position related to other replicas.
// This position is then used to identify who should notify about the alert first.
GetPositionFunc func(userID string) int
}

// An Alertmanager manages the alerts for one user.
type Alertmanager struct {
cfg *Config
api *api.API
logger log.Logger
state State
nflog *nflog.Log
silences *silence.Silences
marker types.Marker
Expand Down Expand Up @@ -96,6 +106,13 @@ func init() {
}()
}

// State helps with replication and synchronization of notifications and silences across several alertmanager replicas.
type State interface {
AddState(string, cluster.State, prometheus.Registerer) cluster.ClusterChannel
Position() int
WaitReady()
}

// New creates a new Alertmanager.
func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
am := &Alertmanager{
Expand All @@ -110,6 +127,22 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {

am.registry = reg

// We currently have 3 operational modes:
// 1) Alertmanager clustering with upstream Gossip
// 2) Alertmanager sharding and ring-based replication
// 3) Alertmanager no replication
// These are covered in order.
if cfg.Peer != nil {
level.Debug(am.logger).Log("msg", "starting tenant alertmanager with gossip-based replication")
am.state = cfg.Peer
} else if cfg.ShardingEnabled {
level.Debug(am.logger).Log("msg", "starting tenant alertmanager with ring-based replication")
am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.ReplicateStateFunc, cfg.GetPositionFunc, am.stop, am.logger, am.registry)
} else {
level.Debug(am.logger).Log("msg", "starting tenant alertmanager without replication")
am.state = &NilPeer{}
}

am.wg.Add(1)
nflogID := fmt.Sprintf("nflog:%s", cfg.UserID)
var err error
Expand All @@ -123,10 +156,9 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
if err != nil {
return nil, fmt.Errorf("failed to create notification log: %v", err)
}
if cfg.Peer != nil {
c := cfg.Peer.AddState("nfl:"+cfg.UserID, am.nflog, am.registry)
am.nflog.SetBroadcast(c.Broadcast)
}

c := am.state.AddState("nfl:"+cfg.UserID, am.nflog, am.registry)
am.nflog.SetBroadcast(c.Broadcast)

am.marker = types.NewMarker(am.registry)

Expand All @@ -140,10 +172,9 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
if err != nil {
return nil, fmt.Errorf("failed to create silences: %v", err)
}
if cfg.Peer != nil {
c := cfg.Peer.AddState("sil:"+cfg.UserID, am.silences, am.registry)
am.silences.SetBroadcast(c.Broadcast)
}

c = am.state.AddState("sil:"+cfg.UserID, am.silences, am.registry)
am.silences.SetBroadcast(c.Broadcast)

am.pipelineBuilder = notify.NewPipelineBuilder(am.registry)

Expand All @@ -162,9 +193,10 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
Alerts: am.alerts,
Silences: am.silences,
StatusFunc: am.marker.Status,
Peer: &NilPeer{},
Registry: am.registry,
Logger: log.With(am.logger, "component", "api"),
// Cortex should not expose cluster information back to its tenants.
Peer: &NilPeer{},
Registry: am.registry,
Logger: log.With(am.logger, "component", "api"),
GroupFunc: func(f1 func(*dispatch.Route) bool, f2 func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string) {
return am.dispatcher.Groups(f1, f2)
},
Expand All @@ -190,14 +222,16 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
}

am.dispatcherMetrics = dispatch.NewDispatcherMetrics(am.registry)

//TODO: From this point onward, the alertmanager _might_ receive requests - we need to make sure we've settled and are ready.
return am, nil
}

// clusterWait returns a function that inspects the current peer state and returns
// a duration of one base timeout for each peer with a higher ID than ourselves.
func clusterWait(p *cluster.Peer, timeout time.Duration) func() time.Duration {
func clusterWait(position func() int, timeout time.Duration) func() time.Duration {
return func() time.Duration {
return time.Duration(p.Position()) * timeout
return time.Duration(position()) * timeout
}
}

Expand Down Expand Up @@ -230,7 +264,8 @@ func (am *Alertmanager) ApplyConfig(userID string, conf *config.Config, rawCfg s

am.inhibitor = inhibit.NewInhibitor(am.alerts, conf.InhibitRules, am.marker, log.With(am.logger, "component", "inhibitor"))

waitFunc := clusterWait(am.cfg.Peer, am.cfg.PeerTimeout)
waitFunc := clusterWait(am.state.Position, am.cfg.PeerTimeout)

timeoutFunc := func(d time.Duration) time.Duration {
if d < notify.MinTimeout {
d = notify.MinTimeout
Expand All @@ -255,7 +290,7 @@ func (am *Alertmanager) ApplyConfig(userID string, conf *config.Config, rawCfg s
silence.NewSilencer(am.silences, am.marker, am.logger),
muteTimes,
am.nflog,
am.cfg.Peer,
am.state,
)
am.dispatcher = dispatch.NewDispatcher(
am.alerts,
Expand Down Expand Up @@ -293,6 +328,10 @@ func (am *Alertmanager) StopAndWait() {
am.wg.Wait()
}

func (am *Alertmanager) mergePartialExternalState(part *clusterpb.Part) error {
return am.state.(*state).MergePartialState(part)
}

// buildIntegrationsMap builds a map of name to the list of integration notifiers off of a
// list of receiver config.
func buildIntegrationsMap(nc []*config.Receiver, tmpl *template.Template, logger log.Logger) (map[string][]notify.Integration, error) {
Expand Down
32 changes: 31 additions & 1 deletion pkg/alertmanager/alertmanager_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ type alertmanagerMetrics struct {

// The alertmanager config hash.
configHashValue *prometheus.Desc

partialMerges *prometheus.Desc
partialMergesFailed *prometheus.Desc
replicationTotal *prometheus.Desc
replicationFailed *prometheus.Desc
}

func newAlertmanagerMetrics() *alertmanagerMetrics {
Expand Down Expand Up @@ -147,6 +152,22 @@ func newAlertmanagerMetrics() *alertmanagerMetrics {
"cortex_alertmanager_config_hash",
"Hash of the currently loaded alertmanager configuration.",
[]string{"user"}, nil),
partialMerges: prometheus.NewDesc(
"cortex_alertmanager_partial_state_merges_total",
"Number of times we have received a partial state to merge for a key.",
[]string{"key"}, nil),
partialMergesFailed: prometheus.NewDesc(
"cortex_alertmanager_partial_state_merges_failed_total",
"Number of times we have failed to merge a partial state received for a key.",
[]string{"key"}, nil),
replicationTotal: prometheus.NewDesc(
"cortex_alertmanager_state_replication_total",
"Number of times we have tried to replicate a state to other alertmanagers",
[]string{"key"}, nil),
replicationFailed: prometheus.NewDesc(
"cortex_alertmanager_state_replication_failed_total",
"Number of times we have failed to replicate a state to other alertmanagers",
[]string{"key"}, nil),
}
}

Expand All @@ -155,7 +176,7 @@ func (m *alertmanagerMetrics) addUserRegistry(user string, reg *prometheus.Regis
}

func (m *alertmanagerMetrics) removeUserRegistry(user string) {
// We neeed to go for a soft deletion here, as hard deletion requires
// We need to go for a soft deletion here, as hard deletion requires
// that _all_ metrics except gauges are per-user.
m.regs.RemoveUserRegistry(user, false)
}
Expand Down Expand Up @@ -185,6 +206,10 @@ func (m *alertmanagerMetrics) Describe(out chan<- *prometheus.Desc) {
out <- m.silencesPropagatedMessagesTotal
out <- m.silences
out <- m.configHashValue
out <- m.partialMerges
out <- m.partialMergesFailed
out <- m.replicationTotal
out <- m.replicationFailed
}

func (m *alertmanagerMetrics) Collect(out chan<- prometheus.Metric) {
Expand Down Expand Up @@ -218,4 +243,9 @@ func (m *alertmanagerMetrics) Collect(out chan<- prometheus.Metric) {
data.SendSumOfGaugesPerUserWithLabels(out, m.silences, "alertmanager_silences", "state")

data.SendMaxOfGaugesPerUser(out, m.configHashValue, "alertmanager_config_hash")

data.SendSumOfCountersWithLabels(out, m.partialMerges, "alertmanager_partial_state_merges_total", "key")
data.SendSumOfCountersWithLabels(out, m.partialMergesFailed, "alertmanager_partial_state_merges_failed_total", "key")
data.SendSumOfCountersWithLabels(out, m.replicationTotal, "alertmanager_state_replication_total", "key")
data.SendSumOfCountersWithLabels(out, m.replicationFailed, "alertmanager_state_replication_failed_total", "key")
}
Loading

0 comments on commit d27b2d0

Please sign in to comment.