Skip to content

Commit

Permalink
Move ring operations to packages where they are used. (#3675)
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
  • Loading branch information
pstibrany authored and aknuds1 committed Sep 7, 2021
1 parent f2bf8e6 commit a64504d
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 157 deletions.
4 changes: 2 additions & 2 deletions pkg/ring/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type itemTracker struct {
// to send to that ingester.
//
// Not implemented as a method on Ring so we can test separately.
func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error, cleanup func()) error {
func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error, cleanup func()) error {
if r.IngesterCount() <= 0 {
return fmt.Errorf("DoBatch: IngesterCount <= 0")
}
Expand All @@ -52,7 +52,7 @@ func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(Inges
bufZones [GetBufferSize]string
)
for i, key := range keys {
replicationSet, err := r.Get(key, Write, bufDescs[:0], bufHosts[:0], bufZones[:0])
replicationSet, err := r.Get(key, op, bufDescs[:0], bufHosts[:0], bufZones[:0])
if err != nil {
return err
}
Expand Down
26 changes: 1 addition & 25 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,32 +133,8 @@ func (i *IngesterDesc) GetRegisteredAt() time.Time {
return time.Unix(i.RegisteredTimestamp, 0)
}

// IsHealthy checks whether the ingester appears to be alive and heartbeating
func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration, now time.Time) bool {
healthy := false

switch op {
case Write:
healthy = i.State == ACTIVE

case Read:
healthy = (i.State == ACTIVE) || (i.State == LEAVING) || (i.State == PENDING)

case Reporting:
healthy = true

case BlocksSync:
healthy = (i.State == JOINING) || (i.State == ACTIVE) || (i.State == LEAVING)

case BlocksRead:
healthy = i.State == ACTIVE

case Ruler:
healthy = i.State == ACTIVE

case Compactor:
healthy = i.State == ACTIVE
}
healthy := op.IsInstanceInStateHealthy(i.State)

return healthy && now.Unix()-i.Timestamp <= heartbeatTimeout.Milliseconds()/1000
}
Expand Down
54 changes: 0 additions & 54 deletions pkg/ring/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,60 +63,6 @@ func TestIngesterDesc_IsHealthy_ForIngesterOperations(t *testing.T) {
}
}

func TestIngesterDesc_IsHealthy_ForStoreGatewayOperations(t *testing.T) {
t.Parallel()

tests := map[string]struct {
instance *IngesterDesc
timeout time.Duration
syncExpected bool
queryExpected bool
}{
"ACTIVE instance with last keepalive newer than timeout": {
instance: &IngesterDesc{State: ACTIVE, Timestamp: time.Now().Add(-30 * time.Second).Unix()},
timeout: time.Minute,
syncExpected: true,
queryExpected: true,
},
"ACTIVE instance with last keepalive older than timeout": {
instance: &IngesterDesc{State: ACTIVE, Timestamp: time.Now().Add(-90 * time.Second).Unix()},
timeout: time.Minute,
syncExpected: false,
queryExpected: false,
},
"JOINING instance with last keepalive newer than timeout": {
instance: &IngesterDesc{State: JOINING, Timestamp: time.Now().Add(-30 * time.Second).Unix()},
timeout: time.Minute,
syncExpected: true,
queryExpected: false,
},
"LEAVING instance with last keepalive newer than timeout": {
instance: &IngesterDesc{State: LEAVING, Timestamp: time.Now().Add(-30 * time.Second).Unix()},
timeout: time.Minute,
syncExpected: true,
queryExpected: false,
},
"PENDING instance with last keepalive newer than timeout": {
instance: &IngesterDesc{State: PENDING, Timestamp: time.Now().Add(-30 * time.Second).Unix()},
timeout: time.Minute,
syncExpected: false,
queryExpected: false,
},
}

for testName, testData := range tests {
testData := testData

t.Run(testName, func(t *testing.T) {
actual := testData.instance.IsHealthy(BlocksSync, testData.timeout, time.Now())
assert.Equal(t, testData.syncExpected, actual)

actual = testData.instance.IsHealthy(BlocksRead, testData.timeout, time.Now())
assert.Equal(t, testData.queryExpected, actual)
})
}
}

func TestIngesterDesc_GetRegisteredAt(t *testing.T) {
tests := map[string]struct {
desc *IngesterDesc
Expand Down
35 changes: 3 additions & 32 deletions pkg/ring/replication_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,12 @@ type ReplicationStrategy interface {
// for an operation to succeed. Returns an error if there are not enough
// instances.
Filter(instances []IngesterDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool) (healthy []IngesterDesc, maxFailures int, err error)

// ShouldExtendReplicaSet returns true if given an instance that's going to be
// added to the replica set, the replica set size should be extended by 1
// more instance for the given operation.
ShouldExtendReplicaSet(instance IngesterDesc, op Operation) bool
}

type defaultReplicationStrategy struct {
ExtendWrites bool
}
type defaultReplicationStrategy struct{}

func NewDefaultReplicationStrategy(extendWrites bool) ReplicationStrategy {
return &defaultReplicationStrategy{
ExtendWrites: extendWrites,
}
func NewDefaultReplicationStrategy() ReplicationStrategy {
return &defaultReplicationStrategy{}
}

// Filter decides, given the set of ingesters eligible for a key,
Expand Down Expand Up @@ -72,26 +63,6 @@ func (s *defaultReplicationStrategy) Filter(ingesters []IngesterDesc, op Operati
return ingesters, len(ingesters) - minSuccess, nil
}

func (s *defaultReplicationStrategy) ShouldExtendReplicaSet(ingester IngesterDesc, op Operation) bool {
// We do not want to Write to Ingesters that are not ACTIVE, but we do want
// to write the extra replica somewhere. So we increase the size of the set
// of replicas for the key. This means we have to also increase the
// size of the replica set for read, but we can read from Leaving ingesters,
// so don't skip it in this case.
// NB dead ingester will be filtered later by defaultReplicationStrategy.Filter().
if op == Write {
if s.ExtendWrites {
return ingester.State != ACTIVE
}
return false
} else if op == Read && (ingester.State != ACTIVE && ingester.State != LEAVING) {
return true
}

return false
}

// IsHealthy checks whether an ingester appears to be alive and heartbeating
func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation, now time.Time) bool {
return ingester.IsHealthy(op, r.cfg.HeartbeatTimeout, now)
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/ring/replication_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
func TestRingReplicationStrategy(t *testing.T) {
for i, tc := range []struct {
RF, LiveIngesters, DeadIngesters int
op Operation // Will default to READ
ExpectedMaxFailure int
ExpectedError string
}{
Expand Down Expand Up @@ -90,8 +89,8 @@ func TestRingReplicationStrategy(t *testing.T) {
}

t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) {
strategy := NewDefaultReplicationStrategy(true)
liveIngesters, maxFailure, err := strategy.Filter(ingesters, tc.op, tc.RF, 100*time.Second, false)
strategy := NewDefaultReplicationStrategy()
liveIngesters, maxFailure, err := strategy.Filter(ingesters, Read, tc.RF, 100*time.Second, false)
if tc.ExpectedError == "" {
assert.NoError(t, err)
assert.Equal(t, tc.LiveIngesters, len(liveIngesters))
Expand Down
78 changes: 59 additions & 19 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,26 +75,27 @@ type ReadRing interface {
HasInstance(instanceID string) bool
}

// Operation can be Read or Write
type Operation int

// Values for Operation
const (
Read Operation = iota
Write
Reporting // Special value for inquiring about health

// BlocksSync is the operation run by the store-gateway to sync blocks.
BlocksSync
var (
// Write operation that also extends replica set, if ingester state is not ACTIVE.
Write = NewOp([]IngesterState{ACTIVE}, func(s IngesterState) bool {
// We do not want to Write to Ingesters that are not ACTIVE, but we do want
// to write the extra replica somewhere. So we increase the size of the set
// of replicas for the key.
// NB dead ingester will be filtered later by defaultReplicationStrategy.Filter().
return s != ACTIVE
})

// BlocksRead is the operation run by the querier to query blocks via the store-gateway.
BlocksRead
// WriteNoExtend is like Write, but with no replicaset extension.
WriteNoExtend = NewOp([]IngesterState{ACTIVE}, nil)

// Ruler is the operation used for distributing rule groups between rulers.
Ruler
Read = NewOp([]IngesterState{ACTIVE, PENDING, LEAVING}, func(s IngesterState) bool {
// To match Write with extended replica set we have to also increase the
// size of the replica set for Read, but we can read from LEAVING ingesters.
return s != ACTIVE && s != LEAVING
})

// Compactor is the operation used for distributing tenants/blocks across compactors.
Compactor
// Reporting is a special value for inquiring about health.
Reporting = allStatesRingOperation
)

var (
Expand Down Expand Up @@ -202,7 +203,7 @@ func New(cfg Config, name, key string, reg prometheus.Registerer) (*Ring, error)
return nil, err
}

return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy(cfg.ExtendWrites))
return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy())
}

func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client, strategy ReplicationStrategy) (*Ring, error) {
Expand Down Expand Up @@ -349,7 +350,7 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts,

// Check whether the replica set should be extended given we're including
// this instance.
if r.strategy.ShouldExtendReplicaSet(ingester, op) {
if op.ShouldExtendReplicaSetOnState(ingester.State) {
n++
}

Expand Down Expand Up @@ -803,3 +804,42 @@ func (r *Ring) setCachedShuffledSubring(identifier string, size int, subring *Ri
r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size}] = subring
}
}

// Operation describes which instances can be included in the replica set, based on their state.
//
// Implemented as bitmap, with upper 16-bits used for encoding extendReplicaSet, and lower 16-bits used for encoding healthy states.
type Operation uint32

// NewOp constructs new Operation with given "healthy" states for operation, and optional function to extend replica set.
// Result of calling shouldExtendReplicaSet is cached.
func NewOp(healthyStates []IngesterState, shouldExtendReplicaSet func(s IngesterState) bool) Operation {
op := Operation(0)
for _, s := range healthyStates {
op |= (1 << s)
}

if shouldExtendReplicaSet != nil {
for _, s := range []IngesterState{ACTIVE, LEAVING, PENDING, JOINING, LEAVING, LEFT} {
if shouldExtendReplicaSet(s) {
op |= (0x10000 << s)
}
}
}

return op
}

// IsInstanceInStateHealthy is used during "filtering" phase to remove undesired instances based on their state.
func (op Operation) IsInstanceInStateHealthy(s IngesterState) bool {
return op&(1<<s) > 0
}

// ShouldExtendReplicaSetOnState returns true if given a state of instance that's going to be
// added to the replica set, the replica set size should be extended by 1
// more instance for the given operation.
func (op Operation) ShouldExtendReplicaSetOnState(s IngesterState) bool {
return op&(0x10000<<s) > 0
}

// All states are healthy, no states extend replica set.
var allStatesRingOperation = Operation(0x0000ffff)
Loading

0 comments on commit a64504d

Please sign in to comment.