Skip to content

Commit

Permalink
Optimised Ring.ShuffleShard() and disabled subring cache in store-gat…
Browse files Browse the repository at this point in the history
…eway, ruler and compactor (#3601)

* Disabled subring cache in store-gateway, ruler and compactor

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Optimise Ring.ShuffleShard() and Ring.Get()

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Code cleanup

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Code cleanup

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Simplified Ring.TokensFor()

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Added CHANGELOG entry

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Added test to ensure Ring.Get() doesn't do any memory allocation

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored and aknuds1 committed Sep 7, 2021
1 parent 7c0ad72 commit f2bf8e6
Show file tree
Hide file tree
Showing 10 changed files with 593 additions and 243 deletions.
9 changes: 6 additions & 3 deletions pkg/ring/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(Inges
itemTrackers := make([]itemTracker, len(keys))
ingesters := make(map[string]ingester, r.IngesterCount())

const maxExpectedReplicationSet = 5 // Typical replication factor 3, plus one for inactive plus one for luck.
var descs [maxExpectedReplicationSet]IngesterDesc
var (
bufDescs [GetBufferSize]IngesterDesc
bufHosts [GetBufferSize]string
bufZones [GetBufferSize]string
)
for i, key := range keys {
replicationSet, err := r.Get(key, Write, descs[:0])
replicationSet, err := r.Get(key, Write, bufDescs[:0], bufHosts[:0], bufZones[:0])
if err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/ring/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,14 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
sort.Strings(ingesterIDs)

now := time.Now()
ingesters := []interface{}{}
_, owned := countTokens(r.ringDesc, r.ringTokens)
_, owned := r.countTokens()
for _, id := range ingesterIDs {
ing := r.ringDesc.Ingesters[id]
heartbeatTimestamp := time.Unix(ing.Timestamp, 0)
state := ing.State.String()
if !r.IsHealthy(&ing, Reporting) {
if !r.IsHealthy(&ing, Reporting, now) {
state = unhealthy
}

Expand Down Expand Up @@ -178,7 +179,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ShowTokens bool `json:"-"`
}{
Ingesters: ingesters,
Now: time.Now(),
Now: now,
ShowTokens: tokensParam == "true",
}, pageTemplate, req)
}
4 changes: 3 additions & 1 deletion pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,11 +753,13 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) {
zones := map[string]struct{}{}

if ringDesc != nil {
now := time.Now()

for _, ingester := range ringDesc.Ingesters {
zones[ingester.Zone] = struct{}{}

// Count the number of healthy instances for Write operation.
if ingester.IsHealthy(Write, i.cfg.RingConfig.HeartbeatTimeout) {
if ingester.IsHealthy(Write, i.cfg.RingConfig.HeartbeatTimeout, now) {
healthyInstancesCount++
}
}
Expand Down
161 changes: 112 additions & 49 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ring

import (
"container/heap"
"fmt"
"sort"
"time"
Expand All @@ -11,13 +12,6 @@ import (
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
)

// ByToken is a sortable list of TokenDescs
type ByToken []TokenDesc

func (ts ByToken) Len() int { return len(ts) }
func (ts ByToken) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
func (ts ByToken) Less(i, j int) bool { return ts[i].Token < ts[j].Token }

// ByAddr is a sortable list of IngesterDesc.
type ByAddr []IngesterDesc

Expand Down Expand Up @@ -121,16 +115,12 @@ func (d *Desc) Ready(now time.Time, heartbeatTimeout time.Duration) error {
return nil
}

// TokensFor partitions the tokens into those for the given ID, and those for others.
func (d *Desc) TokensFor(id string) (tokens, other Tokens) {
takenTokens, myTokens := Tokens{}, Tokens{}
for _, token := range d.getTokens() {
takenTokens = append(takenTokens, token.Token)
if token.Ingester == id {
myTokens = append(myTokens, token.Token)
}
}
return myTokens, takenTokens
// TokensFor return all ring tokens and tokens for the input provided ID.
// Returned tokens are guaranteed to be sorted.
func (d *Desc) TokensFor(id string) (myTokens, allTokens Tokens) {
allTokens = d.GetTokens()
myTokens = d.Ingesters[id].Tokens
return
}

// GetRegisteredAt returns the timestamp when the instance has been registered to the ring
Expand All @@ -144,7 +134,7 @@ func (i *IngesterDesc) GetRegisteredAt() time.Time {
}

// IsHealthy checks whether the ingester appears to be alive and heartbeating
func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration) bool {
func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration, now time.Time) bool {
healthy := false

switch op {
Expand All @@ -170,7 +160,7 @@ func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration) b
healthy = i.State == ACTIVE
}

return healthy && time.Since(time.Unix(i.Timestamp, 0)) <= heartbeatTimeout
return healthy && now.Unix()-i.Timestamp <= heartbeatTimeout.Milliseconds()/1000
}

// Merge merges other ring into this one. Returns sub-ring that represents the change,
Expand Down Expand Up @@ -419,46 +409,43 @@ func (d *Desc) RemoveTombstones(limit time.Time) {
}
}

type TokenDesc struct {
Token uint32
Ingester string
Zone string
}
func (d *Desc) getTokensInfo() map[uint32]instanceInfo {
out := map[uint32]instanceInfo{}

// getTokens returns sorted list of tokens with ingester IDs, owned by each ingester in the ring.
func (d *Desc) getTokens() []TokenDesc {
numTokens := 0
for _, ing := range d.Ingesters {
numTokens += len(ing.Tokens)
}
tokens := make([]TokenDesc, 0, numTokens)
for key, ing := range d.Ingesters {
for _, token := range ing.Tokens {
tokens = append(tokens, TokenDesc{Token: token, Ingester: key, Zone: ing.GetZone()})
for instanceID, instance := range d.Ingesters {
info := instanceInfo{
InstanceID: instanceID,
Zone: instance.Zone,
}

for _, token := range instance.Tokens {
out[token] = info
}
}

sort.Sort(ByToken(tokens))
return tokens
return out
}

// getTokensByZone returns instances tokens grouped by zone. Tokens within each zone
// are guaranteed to be sorted.
func (d *Desc) getTokensByZone() map[string][]TokenDesc {
zones := map[string][]TokenDesc{}

for key, ing := range d.Ingesters {
for _, token := range ing.Tokens {
zones[ing.Zone] = append(zones[ing.Zone], TokenDesc{Token: token, Ingester: key, Zone: ing.GetZone()})
}
// GetTokens returns sorted list of tokens owned by all instances within the ring.
func (d *Desc) GetTokens() []uint32 {
instances := make([][]uint32, 0, len(d.Ingesters))
for _, instance := range d.Ingesters {
instances = append(instances, instance.Tokens)
}

// Ensure tokens are sorted within each zone.
for zone := range zones {
sort.Sort(ByToken(zones[zone]))
return MergeTokens(instances)
}

// getTokensByZone returns instances tokens grouped by zone. Tokens within each zone
// are guaranteed to be sorted.
func (d *Desc) getTokensByZone() map[string][]uint32 {
zones := map[string][][]uint32{}
for _, instance := range d.Ingesters {
zones[instance.Zone] = append(zones[instance.Zone], instance.Tokens)
}

return zones
// Merge tokens per zone.
return MergeTokensByZone(zones)
}

type CompareResult int
Expand Down Expand Up @@ -539,3 +526,79 @@ func GetOrCreateRingDesc(d interface{}) *Desc {
}
return d.(*Desc)
}

// TokensHeap is an heap data structure used to merge multiple lists
// of sorted tokens into a single one.
type TokensHeap [][]uint32

func (h TokensHeap) Len() int {
return len(h)
}

func (h TokensHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h TokensHeap) Less(i, j int) bool {
return h[i][0] < h[j][0]
}

func (h *TokensHeap) Push(x interface{}) {
*h = append(*h, x.([]uint32))
}

func (h *TokensHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

// MergeTokens takes in input multiple lists of tokens and returns a single list
// containing all tokens merged and sorted. Each input single list is required
// to have tokens already sorted.
func MergeTokens(instances [][]uint32) []uint32 {
numTokens := 0

// Build the heap.
h := make(TokensHeap, 0, len(instances))
for _, tokens := range instances {
if len(tokens) == 0 {
continue
}

// We can safely append the input slice because elements inside are never shuffled.
h = append(h, tokens)
numTokens += len(tokens)
}
heap.Init(&h)

out := make([]uint32, 0, numTokens)

for h.Len() > 0 {
// The minimum element in the tree is the root, at index 0.
lowest := h[0]
out = append(out, lowest[0])

if len(lowest) > 1 {
// Remove the first token from the lowest because we popped it
// and then fix the heap to keep it sorted.
h[0] = h[0][1:]
heap.Fix(&h, 0)
} else {
heap.Remove(&h, 0)
}
}

return out
}

// MergeTokensByZone is like MergeTokens but does it for each input zone.
func MergeTokensByZone(zones map[string][][]uint32) map[string][]uint32 {
out := make(map[string][]uint32, len(zones))
for zone, tokens := range zones {
out[zone] = MergeTokens(tokens)
}
return out
}
Loading

0 comments on commit f2bf8e6

Please sign in to comment.