diff --git a/pkg/ring/batch.go b/pkg/ring/batch.go index 1050fb9d5..69672269f 100644 --- a/pkg/ring/batch.go +++ b/pkg/ring/batch.go @@ -38,7 +38,7 @@ type itemTracker struct { // to send to that ingester. // // Not implemented as a method on Ring so we can test separately. -func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error, cleanup func()) error { +func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error, cleanup func()) error { if r.IngesterCount() <= 0 { return fmt.Errorf("DoBatch: IngesterCount <= 0") } @@ -52,7 +52,7 @@ func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(Inges bufZones [GetBufferSize]string ) for i, key := range keys { - replicationSet, err := r.Get(key, Write, bufDescs[:0], bufHosts[:0], bufZones[:0]) + replicationSet, err := r.Get(key, op, bufDescs[:0], bufHosts[:0], bufZones[:0]) if err != nil { return err } diff --git a/pkg/ring/model.go b/pkg/ring/model.go index a981918fa..cd41039e5 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -133,32 +133,8 @@ func (i *IngesterDesc) GetRegisteredAt() time.Time { return time.Unix(i.RegisteredTimestamp, 0) } -// IsHealthy checks whether the ingester appears to be alive and heartbeating func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration, now time.Time) bool { - healthy := false - - switch op { - case Write: - healthy = i.State == ACTIVE - - case Read: - healthy = (i.State == ACTIVE) || (i.State == LEAVING) || (i.State == PENDING) - - case Reporting: - healthy = true - - case BlocksSync: - healthy = (i.State == JOINING) || (i.State == ACTIVE) || (i.State == LEAVING) - - case BlocksRead: - healthy = i.State == ACTIVE - - case Ruler: - healthy = i.State == ACTIVE - - case Compactor: - healthy = i.State == ACTIVE - } + healthy := op.IsInstanceInStateHealthy(i.State) return healthy && now.Unix()-i.Timestamp <= heartbeatTimeout.Milliseconds()/1000 } diff --git a/pkg/ring/model_test.go b/pkg/ring/model_test.go index ed7206f06..aaaf4da7a 100644 --- a/pkg/ring/model_test.go +++ b/pkg/ring/model_test.go @@ -63,60 +63,6 @@ func TestIngesterDesc_IsHealthy_ForIngesterOperations(t *testing.T) { } } -func TestIngesterDesc_IsHealthy_ForStoreGatewayOperations(t *testing.T) { - t.Parallel() - - tests := map[string]struct { - instance *IngesterDesc - timeout time.Duration - syncExpected bool - queryExpected bool - }{ - "ACTIVE instance with last keepalive newer than timeout": { - instance: &IngesterDesc{State: ACTIVE, Timestamp: time.Now().Add(-30 * time.Second).Unix()}, - timeout: time.Minute, - syncExpected: true, - queryExpected: true, - }, - "ACTIVE instance with last keepalive older than timeout": { - instance: &IngesterDesc{State: ACTIVE, Timestamp: time.Now().Add(-90 * time.Second).Unix()}, - timeout: time.Minute, - syncExpected: false, - queryExpected: false, - }, - "JOINING instance with last keepalive newer than timeout": { - instance: &IngesterDesc{State: JOINING, Timestamp: time.Now().Add(-30 * time.Second).Unix()}, - timeout: time.Minute, - syncExpected: true, - queryExpected: false, - }, - "LEAVING instance with last keepalive newer than timeout": { - instance: &IngesterDesc{State: LEAVING, Timestamp: time.Now().Add(-30 * time.Second).Unix()}, - timeout: time.Minute, - syncExpected: true, - queryExpected: false, - }, - "PENDING instance with last keepalive newer than timeout": { - instance: &IngesterDesc{State: PENDING, Timestamp: time.Now().Add(-30 * time.Second).Unix()}, - timeout: time.Minute, - syncExpected: false, - queryExpected: false, - }, - } - - for testName, testData := range tests { - testData := testData - - t.Run(testName, func(t *testing.T) { - actual := testData.instance.IsHealthy(BlocksSync, testData.timeout, time.Now()) - assert.Equal(t, testData.syncExpected, actual) - - actual = testData.instance.IsHealthy(BlocksRead, testData.timeout, time.Now()) - assert.Equal(t, testData.queryExpected, actual) - }) - } -} - func TestIngesterDesc_GetRegisteredAt(t *testing.T) { tests := map[string]struct { desc *IngesterDesc diff --git a/pkg/ring/replication_strategy.go b/pkg/ring/replication_strategy.go index 5b3e1d414..1e372276a 100644 --- a/pkg/ring/replication_strategy.go +++ b/pkg/ring/replication_strategy.go @@ -10,21 +10,12 @@ type ReplicationStrategy interface { // for an operation to succeed. Returns an error if there are not enough // instances. Filter(instances []IngesterDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool) (healthy []IngesterDesc, maxFailures int, err error) - - // ShouldExtendReplicaSet returns true if given an instance that's going to be - // added to the replica set, the replica set size should be extended by 1 - // more instance for the given operation. - ShouldExtendReplicaSet(instance IngesterDesc, op Operation) bool } -type defaultReplicationStrategy struct { - ExtendWrites bool -} +type defaultReplicationStrategy struct{} -func NewDefaultReplicationStrategy(extendWrites bool) ReplicationStrategy { - return &defaultReplicationStrategy{ - ExtendWrites: extendWrites, - } +func NewDefaultReplicationStrategy() ReplicationStrategy { + return &defaultReplicationStrategy{} } // Filter decides, given the set of ingesters eligible for a key, @@ -72,26 +63,6 @@ func (s *defaultReplicationStrategy) Filter(ingesters []IngesterDesc, op Operati return ingesters, len(ingesters) - minSuccess, nil } -func (s *defaultReplicationStrategy) ShouldExtendReplicaSet(ingester IngesterDesc, op Operation) bool { - // We do not want to Write to Ingesters that are not ACTIVE, but we do want - // to write the extra replica somewhere. So we increase the size of the set - // of replicas for the key. This means we have to also increase the - // size of the replica set for read, but we can read from Leaving ingesters, - // so don't skip it in this case. - // NB dead ingester will be filtered later by defaultReplicationStrategy.Filter(). - if op == Write { - if s.ExtendWrites { - return ingester.State != ACTIVE - } - return false - } else if op == Read && (ingester.State != ACTIVE && ingester.State != LEAVING) { - return true - } - - return false -} - -// IsHealthy checks whether an ingester appears to be alive and heartbeating func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation, now time.Time) bool { return ingester.IsHealthy(op, r.cfg.HeartbeatTimeout, now) } diff --git a/pkg/ring/replication_strategy_test.go b/pkg/ring/replication_strategy_test.go index 3c9a31611..3d47d1cec 100644 --- a/pkg/ring/replication_strategy_test.go +++ b/pkg/ring/replication_strategy_test.go @@ -11,7 +11,6 @@ import ( func TestRingReplicationStrategy(t *testing.T) { for i, tc := range []struct { RF, LiveIngesters, DeadIngesters int - op Operation // Will default to READ ExpectedMaxFailure int ExpectedError string }{ @@ -90,8 +89,8 @@ func TestRingReplicationStrategy(t *testing.T) { } t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { - strategy := NewDefaultReplicationStrategy(true) - liveIngesters, maxFailure, err := strategy.Filter(ingesters, tc.op, tc.RF, 100*time.Second, false) + strategy := NewDefaultReplicationStrategy() + liveIngesters, maxFailure, err := strategy.Filter(ingesters, Read, tc.RF, 100*time.Second, false) if tc.ExpectedError == "" { assert.NoError(t, err) assert.Equal(t, tc.LiveIngesters, len(liveIngesters)) diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 7b58ec419..59f7ac3df 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -75,26 +75,27 @@ type ReadRing interface { HasInstance(instanceID string) bool } -// Operation can be Read or Write -type Operation int - -// Values for Operation -const ( - Read Operation = iota - Write - Reporting // Special value for inquiring about health - - // BlocksSync is the operation run by the store-gateway to sync blocks. - BlocksSync +var ( + // Write operation that also extends replica set, if ingester state is not ACTIVE. + Write = NewOp([]IngesterState{ACTIVE}, func(s IngesterState) bool { + // We do not want to Write to Ingesters that are not ACTIVE, but we do want + // to write the extra replica somewhere. So we increase the size of the set + // of replicas for the key. + // NB dead ingester will be filtered later by defaultReplicationStrategy.Filter(). + return s != ACTIVE + }) - // BlocksRead is the operation run by the querier to query blocks via the store-gateway. - BlocksRead + // WriteNoExtend is like Write, but with no replicaset extension. + WriteNoExtend = NewOp([]IngesterState{ACTIVE}, nil) - // Ruler is the operation used for distributing rule groups between rulers. - Ruler + Read = NewOp([]IngesterState{ACTIVE, PENDING, LEAVING}, func(s IngesterState) bool { + // To match Write with extended replica set we have to also increase the + // size of the replica set for Read, but we can read from LEAVING ingesters. + return s != ACTIVE && s != LEAVING + }) - // Compactor is the operation used for distributing tenants/blocks across compactors. - Compactor + // Reporting is a special value for inquiring about health. + Reporting = allStatesRingOperation ) var ( @@ -202,7 +203,7 @@ func New(cfg Config, name, key string, reg prometheus.Registerer) (*Ring, error) return nil, err } - return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy(cfg.ExtendWrites)) + return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy()) } func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client, strategy ReplicationStrategy) (*Ring, error) { @@ -349,7 +350,7 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, // Check whether the replica set should be extended given we're including // this instance. - if r.strategy.ShouldExtendReplicaSet(ingester, op) { + if op.ShouldExtendReplicaSetOnState(ingester.State) { n++ } @@ -803,3 +804,42 @@ func (r *Ring) setCachedShuffledSubring(identifier string, size int, subring *Ri r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size}] = subring } } + +// Operation describes which instances can be included in the replica set, based on their state. +// +// Implemented as bitmap, with upper 16-bits used for encoding extendReplicaSet, and lower 16-bits used for encoding healthy states. +type Operation uint32 + +// NewOp constructs new Operation with given "healthy" states for operation, and optional function to extend replica set. +// Result of calling shouldExtendReplicaSet is cached. +func NewOp(healthyStates []IngesterState, shouldExtendReplicaSet func(s IngesterState) bool) Operation { + op := Operation(0) + for _, s := range healthyStates { + op |= (1 << s) + } + + if shouldExtendReplicaSet != nil { + for _, s := range []IngesterState{ACTIVE, LEAVING, PENDING, JOINING, LEAVING, LEFT} { + if shouldExtendReplicaSet(s) { + op |= (0x10000 << s) + } + } + } + + return op +} + +// IsInstanceInStateHealthy is used during "filtering" phase to remove undesired instances based on their state. +func (op Operation) IsInstanceInStateHealthy(s IngesterState) bool { + return op&(1< 0 +} + +// ShouldExtendReplicaSetOnState returns true if given a state of instance that's going to be +// added to the replica set, the replica set size should be extended by 1 +// more instance for the given operation. +func (op Operation) ShouldExtendReplicaSetOnState(s IngesterState) bool { + return op&(0x10000< 0 +} + +// All states are healthy, no states extend replica set. +var allStatesRingOperation = Operation(0x0000ffff) diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index eafcaf54c..bd8f8f323 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -53,7 +53,7 @@ func benchmarkBatch(b *testing.B, numIngester, numKeys int) { r := Ring{ cfg: cfg, ringDesc: desc, - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), } ctx := context.Background() @@ -68,7 +68,7 @@ func benchmarkBatch(b *testing.B, numIngester, numKeys int) { b.ResetTimer() for i := 0; i < b.N; i++ { generateKeys(rnd, numKeys, keys) - err := DoBatch(ctx, &r, keys, callback, cleanup) + err := DoBatch(ctx, Write, &r, keys, callback, cleanup) require.NoError(b, err) } } @@ -94,9 +94,9 @@ func TestDoBatchZeroIngesters(t *testing.T) { r := Ring{ cfg: Config{}, ringDesc: desc, - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), } - require.Error(t, DoBatch(ctx, &r, keys, callback, cleanup)) + require.Error(t, DoBatch(ctx, Write, &r, keys, callback, cleanup)) } func TestAddIngester(t *testing.T) { @@ -200,7 +200,7 @@ func TestRing_Get_ZoneAwareness(t *testing.T) { ringTokensByZone: r.getTokensByZone(), ringInstanceByToken: r.getTokensInfo(), ringZones: getZones(r.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), } ingesters := make([]IngesterDesc, 0, len(r.GetIngesters())) @@ -294,7 +294,7 @@ func TestRing_GetAllHealthy(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), } set, err := ring.GetAllHealthy(Read) @@ -405,7 +405,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), } set, err := ring.GetReplicationSetForOperation(Read) @@ -723,7 +723,7 @@ func TestRing_GetReplicationSetForOperation_WithZoneAwarenessEnabled(t *testing. ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), } // Check the replication set has the correct settings @@ -859,7 +859,7 @@ func TestRing_ShuffleShard(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), } shardRing := ring.ShuffleShard("tenant-id", testData.shardSize) @@ -911,7 +911,7 @@ func TestRing_ShuffleShard_Stability(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), } for i := 1; i <= numTenants; i++ { @@ -979,7 +979,7 @@ func TestRing_ShuffleShard_Shuffling(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), } // Compute the shard for each tenant. @@ -1078,7 +1078,7 @@ func TestRing_ShuffleShard_Consistency(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), } // Compute the initial shard for each tenant. @@ -1142,7 +1142,7 @@ func TestRing_ShuffleShard_ConsistencyOnShardSizeChanged(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), } // Get the replication set with shard size = 3. @@ -1219,7 +1219,7 @@ func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), } // Get the replication set with shard size = 2. @@ -1478,7 +1478,7 @@ func TestRing_ShuffleShardWithLookback(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), } // Replay the events on the timeline. @@ -1543,7 +1543,7 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), } // The simulation starts with the minimum shard size. Random events can later increase it. @@ -1696,7 +1696,7 @@ func benchmarkShuffleSharding(b *testing.B, numInstances, numZones, numTokens, s ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), shuffledSubringCache: map[subringCacheKey]*Ring{}, - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), lastTopologyChange: time.Now(), } @@ -1724,7 +1724,7 @@ func BenchmarkRing_Get(b *testing.B) { ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), shuffledSubringCache: map[subringCacheKey]*Ring{}, - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), lastTopologyChange: time.Now(), } @@ -1752,7 +1752,7 @@ func TestRing_Get_NoMemoryAllocations(t *testing.T) { ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), shuffledSubringCache: map[subringCacheKey]*Ring{}, - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), lastTopologyChange: time.Now(), } diff --git a/pkg/ring/util_test.go b/pkg/ring/util_test.go index 717235ebb..4c3f51a41 100644 --- a/pkg/ring/util_test.go +++ b/pkg/ring/util_test.go @@ -65,7 +65,7 @@ func TestWaitRingStabilityShouldReturnAsSoonAsMinStabilityIsReachedOnNoChanges(t ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), } startTime := time.Now() @@ -100,7 +100,7 @@ func TestWaitRingStabilityShouldReturnOnceMinStabilityHasBeenReached(t *testing. ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), } // Add 1 new instance after some time. @@ -151,7 +151,7 @@ func TestWaitRingStabilityShouldReturnErrorIfMaxWaitingIsReached(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + strategy: NewDefaultReplicationStrategy(), } // Keep changing the ring.