From b084387185d1642565816c16df69cdaaff06326e Mon Sep 17 00:00:00 2001 From: gotjosh Date: Tue, 19 Jan 2021 14:13:52 -0400 Subject: [PATCH] Alertmanager: Allow sharding of alertmanager tenants (#3664) * Alertmanager: Allow sharding of alertmanager tenants The first part of the proposed as part of https://github.com/cortexproject/cortex/pull/3574, introduces sharding via the ring for the Alertmanager component. Signed-off-by: gotjosh * Appease the linter Signed-off-by: gotjosh * Update CHANGELOG to warn about Alertmanager sharding Signed-off-by: gotjosh * Fix, last typo. Signed-off-by: gotjosh --- pkg/ring/replication_strategy.go | 27 +++++ pkg/ring/replication_strategy_test.go | 144 ++++++++++++++++++-------- 2 files changed, 130 insertions(+), 41 deletions(-) diff --git a/pkg/ring/replication_strategy.go b/pkg/ring/replication_strategy.go index 1e372276a..5156ba22d 100644 --- a/pkg/ring/replication_strategy.go +++ b/pkg/ring/replication_strategy.go @@ -3,6 +3,8 @@ package ring import ( "fmt" "time" + + "github.com/pkg/errors" ) type ReplicationStrategy interface { @@ -63,6 +65,31 @@ func (s *defaultReplicationStrategy) Filter(ingesters []IngesterDesc, op Operati return ingesters, len(ingesters) - minSuccess, nil } +type ignoreUnhealthyInstancesReplicationStrategy struct{} + +func NewIgnoreUnhealthyInstancesReplicationStrategy() ReplicationStrategy { + return &ignoreUnhealthyInstancesReplicationStrategy{} +} + +func (r *ignoreUnhealthyInstancesReplicationStrategy) Filter(instances []IngesterDesc, op Operation, _ int, heartbeatTimeout time.Duration, _ bool) (healthy []IngesterDesc, maxFailures int, err error) { + now := time.Now() + // Filter out unhealthy instances. + for i := 0; i < len(instances); { + if instances[i].IsHealthy(op, heartbeatTimeout, now) { + i++ + } else { + instances = append(instances[:i], instances[i+1:]...) + } + } + + // We need at least 1 healthy instance no matter what is the replication factor set to. + if len(instances) == 0 { + return nil, 0, errors.New("at least 1 healthy replica required, could only find 0") + } + + return instances, len(instances) - 1, nil +} + func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation, now time.Time) bool { return ingester.IsHealthy(op, r.cfg.HeartbeatTimeout, now) } diff --git a/pkg/ring/replication_strategy_test.go b/pkg/ring/replication_strategy_test.go index 3d47d1cec..b1e1d49aa 100644 --- a/pkg/ring/replication_strategy_test.go +++ b/pkg/ring/replication_strategy_test.go @@ -10,93 +10,155 @@ import ( func TestRingReplicationStrategy(t *testing.T) { for i, tc := range []struct { - RF, LiveIngesters, DeadIngesters int - ExpectedMaxFailure int - ExpectedError string + replicationFactor, liveIngesters, deadIngesters int + expectedMaxFailure int + expectedError string }{ // Ensure it works for a single ingester, for local testing. { - RF: 1, - LiveIngesters: 1, - ExpectedMaxFailure: 0, + replicationFactor: 1, + liveIngesters: 1, + expectedMaxFailure: 0, }, { - RF: 1, - DeadIngesters: 1, - ExpectedError: "at least 1 live replicas required, could only find 0", + replicationFactor: 1, + deadIngesters: 1, + expectedError: "at least 1 live replicas required, could only find 0", }, // Ensure it works for RF=3 and 2 ingesters. { - RF: 3, - LiveIngesters: 2, - ExpectedMaxFailure: 0, + replicationFactor: 3, + liveIngesters: 2, + expectedMaxFailure: 0, }, // Ensure it works for the default production config. { - RF: 3, - LiveIngesters: 3, - ExpectedMaxFailure: 1, + replicationFactor: 3, + liveIngesters: 3, + expectedMaxFailure: 1, }, { - RF: 3, - LiveIngesters: 2, - DeadIngesters: 1, - ExpectedMaxFailure: 0, + replicationFactor: 3, + liveIngesters: 2, + deadIngesters: 1, + expectedMaxFailure: 0, }, { - RF: 3, - LiveIngesters: 1, - DeadIngesters: 2, - ExpectedError: "at least 2 live replicas required, could only find 1", + replicationFactor: 3, + liveIngesters: 1, + deadIngesters: 2, + expectedError: "at least 2 live replicas required, could only find 1", }, // Ensure it works when adding / removing nodes. // A node is joining or leaving, replica set expands. { - RF: 3, - LiveIngesters: 4, - ExpectedMaxFailure: 1, + replicationFactor: 3, + liveIngesters: 4, + expectedMaxFailure: 1, }, { - RF: 3, - LiveIngesters: 3, - DeadIngesters: 1, - ExpectedMaxFailure: 0, + replicationFactor: 3, + liveIngesters: 3, + deadIngesters: 1, + expectedMaxFailure: 0, }, { - RF: 3, - LiveIngesters: 2, - DeadIngesters: 2, - ExpectedError: "at least 3 live replicas required, could only find 2", + replicationFactor: 3, + liveIngesters: 2, + deadIngesters: 2, + expectedError: "at least 3 live replicas required, could only find 2", }, } { ingesters := []IngesterDesc{} - for i := 0; i < tc.LiveIngesters; i++ { + for i := 0; i < tc.liveIngesters; i++ { ingesters = append(ingesters, IngesterDesc{ Timestamp: time.Now().Unix(), }) } - for i := 0; i < tc.DeadIngesters; i++ { + for i := 0; i < tc.deadIngesters; i++ { ingesters = append(ingesters, IngesterDesc{}) } t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { strategy := NewDefaultReplicationStrategy() - liveIngesters, maxFailure, err := strategy.Filter(ingesters, Read, tc.RF, 100*time.Second, false) - if tc.ExpectedError == "" { + liveIngesters, maxFailure, err := strategy.Filter(ingesters, Read, tc.replicationFactor, 100*time.Second, false) + if tc.expectedError == "" { assert.NoError(t, err) - assert.Equal(t, tc.LiveIngesters, len(liveIngesters)) - assert.Equal(t, tc.ExpectedMaxFailure, maxFailure) + assert.Equal(t, tc.liveIngesters, len(liveIngesters)) + assert.Equal(t, tc.expectedMaxFailure, maxFailure) } else { - assert.EqualError(t, err, tc.ExpectedError) + assert.EqualError(t, err, tc.expectedError) + } + }) + } +} + +func TestIgnoreUnhealthyInstancesReplicationStrategy(t *testing.T) { + for _, tc := range []struct { + name string + liveIngesters, deadIngesters int + expectedMaxFailure int + expectedError string + }{ + { + name: "with at least 1 healthy instance", + liveIngesters: 1, + expectedMaxFailure: 0, + }, + { + name: "with more healthy instances than unhealthy", + deadIngesters: 1, + liveIngesters: 2, + expectedMaxFailure: 1, + }, + { + name: "with more unhealthy instances than healthy", + deadIngesters: 1, + liveIngesters: 2, + expectedMaxFailure: 1, + }, + { + name: "with equal number of healthy and unhealthy instances", + deadIngesters: 2, + liveIngesters: 2, + expectedMaxFailure: 1, + }, + { + name: "with no healthy instances", + liveIngesters: 0, + deadIngesters: 3, + expectedMaxFailure: 0, + expectedError: "at least 1 healthy replica required, could only find 0", + }, + } { + ingesters := []IngesterDesc{} + for i := 0; i < tc.liveIngesters; i++ { + ingesters = append(ingesters, IngesterDesc{ + Timestamp: time.Now().Unix(), + }) + } + for i := 0; i < tc.deadIngesters; i++ { + ingesters = append(ingesters, IngesterDesc{}) + } + + t.Run(tc.name, func(t *testing.T) { + strategy := NewIgnoreUnhealthyInstancesReplicationStrategy() + liveIngesters, maxFailure, err := strategy.Filter(ingesters, Read, 3, 100*time.Second, false) + if tc.expectedError == "" { + assert.NoError(t, err) + assert.Equal(t, tc.liveIngesters, len(liveIngesters)) + assert.Equal(t, tc.expectedMaxFailure, maxFailure) + } else { + assert.EqualError(t, err, tc.expectedError) } }) }