Skip to content

Commit

Permalink
feat(admin): LFU replica selector
Browse files Browse the repository at this point in the history
This PR adds a new replica selector based on the Least Frequently Used (LFU) algorithm.
`lfuReplicaSelector` selects each replica's storage node and data path, giving preference to those
with fewer assigned replicas.

All replica selectors, including `lfuReplicaSelector`, are stateless, meaning they don't keep
existing log stream topology. When `Select` is invoked, `lfuReplicaSelector` collects used counters
for each storage node and path. Although it seems to be inefficient, it is simple as well as
fault-tolerant.

To select the least used storage nodes and paths, `lfuReplicaSelector` runs as follows:

- Fetch cluster metadata from the `ClusterMetadataView`.
- Increase the used counters for each storage node and path assigned to each log stream.
- Sort counters.
- Choose the least used storage nodes and paths.

Updates #393
  • Loading branch information
ijsong committed Apr 6, 2023
1 parent 93b86ff commit 690b414
Show file tree
Hide file tree
Showing 4 changed files with 402 additions and 295 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ require (
github.com/gogo/status v1.1.1
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.3
github.com/google/gofuzz v1.2.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/lib/pq v1.10.7
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -74,6 +73,7 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/imdario/mergo v0.3.6 // indirect
Expand Down
2 changes: 1 addition & 1 deletion internal/admin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (cfg config) validate() error {

func (cfg *config) ensureDefault() error {
if cfg.snSelector == nil {
rs, err := newBalancedReplicaSelector(cfg.mrmgr.ClusterMetadataView(), int(cfg.replicationFactor))
rs, err := newReplicaSelector(replicaSelectorNameLFU, cfg.mrmgr.ClusterMetadataView(), int(cfg.replicationFactor))
if err != nil {
return err
}
Expand Down
245 changes: 138 additions & 107 deletions internal/admin/replica_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ package admin

import (
"context"
"fmt"
"math"
"math/rand"
"sort"
"strings"
"time"

"github.com/pkg/errors"
"golang.org/x/exp/maps"

"github.com/kakao/varlog/internal/admin/mrmanager"
"github.com/kakao/varlog/pkg/types"
Expand All @@ -26,6 +30,30 @@ type ReplicaSelector interface {
Select(ctx context.Context) ([]*varlogpb.ReplicaDescriptor, error)
}

const (
replicaSelectorNameRandom = "random"
replicaSelectorNameLFU = "lfu" // least frequently used
)

func newReplicaSelector(selector string, cmview mrmanager.ClusterMetadataView, repfactor int) (ReplicaSelector, error) {
if repfactor < 1 {
return nil, errors.Wrap(verrors.ErrInvalid, "replica selector: negative replication factor")
}

if cmview == nil {
return nil, errors.Wrap(verrors.ErrInvalid, "replica selector: invalid cluster metadata view")
}

switch strings.ToLower(selector) {
case replicaSelectorNameRandom:
return newRandomReplicaSelector(cmview, repfactor)
case replicaSelectorNameLFU:
return newLFUSelector(cmview, repfactor)
default:
return nil, fmt.Errorf("unknown selector: %s", selector)
}
}

type randomReplicaSelector struct {
rng *rand.Rand
cmview mrmanager.ClusterMetadataView
Expand All @@ -35,12 +63,6 @@ type randomReplicaSelector struct {
var _ ReplicaSelector = (*randomReplicaSelector)(nil)

func newRandomReplicaSelector(cmview mrmanager.ClusterMetadataView, repfactor int) (*randomReplicaSelector, error) {
if repfactor < 1 {
return nil, errors.Wrap(verrors.ErrInvalid, "replica selector: negative replication factor")
}
if cmview == nil {
return nil, errors.Wrap(verrors.ErrInvalid, "replica selector: invalid cluster metadata view")
}
s := &randomReplicaSelector{
rng: rand.New(rand.NewSource(time.Now().Unix())),
cmview: cmview,
Expand All @@ -50,7 +72,7 @@ func newRandomReplicaSelector(cmview mrmanager.ClusterMetadataView, repfactor in
}

func (s *randomReplicaSelector) Name() string {
return "random"
return replicaSelectorNameRandom
}

func (s *randomReplicaSelector) Select(ctx context.Context) ([]*varlogpb.ReplicaDescriptor, error) {
Expand All @@ -59,149 +81,158 @@ func (s *randomReplicaSelector) Select(ctx context.Context) ([]*varlogpb.Replica
return nil, errors.WithMessage(err, "replica selector")
}

if len(md.StorageNodes) < s.repfactor {
return nil, fmt.Errorf("replica selector: not enough storage nodes: %d < %d", len(md.StorageNodes), s.repfactor)
}

ret := make([]*varlogpb.ReplicaDescriptor, s.repfactor)

snds := md.StorageNodes
sndIndices := s.rng.Perm(len(snds))[:s.repfactor]
for idx, sndIdx := range sndIndices {
snd := snds[sndIdx]
snid := snd.StorageNodeID
if snid.Invalid() {
return nil, errors.New("replica selector: invalid cluster metadata: invalid storage id")
}

snpaths := snd.Paths
if len(snpaths) == 0 {
return nil, fmt.Errorf("replica selector: invalid cluster metadata: no storage path in storage node %d", snid)
}
pathIdx := s.rng.Intn(len(snpaths))
ret[idx] = &varlogpb.ReplicaDescriptor{
StorageNodeID: snd.StorageNodeID,
StorageNodeID: snid,
StorageNodePath: snpaths[pathIdx],
}
}

return ret, nil
}

// balancedReplicaSelector selects storage nodes and volumes for a new log stream to be balanced in
// terms of the number of replicas as well as the number of primary replica per storage node.
// Note that it does not consider loads of storage nodes.
type balancedReplicaSelector struct {
rng *rand.Rand
cmView mrmanager.ClusterMetadataView
replicationFactor int
type lfuSelector struct {
cmview mrmanager.ClusterMetadataView
repfactor int
}

var _ ReplicaSelector = (*balancedReplicaSelector)(nil)
type lfuCounter struct {
snid types.StorageNodeID
replicas uint
primaries uint
snpaths map[string]uint
}

func newBalancedReplicaSelector(cmView mrmanager.ClusterMetadataView, replicationFactor int) (*balancedReplicaSelector, error) {
if replicationFactor < 1 {
return nil, errors.Wrap(verrors.ErrInvalid, "replica selector: negative replication factor")
}
if cmView == nil {
return nil, errors.Wrap(verrors.ErrInvalid, "replica selector: invalid cluster metadata view")
}
sel := &balancedReplicaSelector{
rng: rand.New(rand.NewSource(time.Now().Unix())),
cmView: cmView,
replicationFactor: replicationFactor,
func newLFUSelector(cmview mrmanager.ClusterMetadataView, repfactor int) (*lfuSelector, error) {
s := &lfuSelector{
cmview: cmview,
repfactor: repfactor,
}
return sel, nil
return s, nil
}

func (sel *balancedReplicaSelector) Name() string {
return "balanced"
func (s *lfuSelector) Name() string {
return replicaSelectorNameLFU
}

func (sel *balancedReplicaSelector) Select(ctx context.Context) ([]*varlogpb.ReplicaDescriptor, error) {
md, err := sel.cmView.ClusterMetadata(ctx)
func (s *lfuSelector) Select(ctx context.Context) ([]*varlogpb.ReplicaDescriptor, error) {
md, err := s.cmview.ClusterMetadata(ctx)
if err != nil {
return nil, errors.WithMessage(err, "replica selector")
}

snds := md.GetStorageNodes()
stats := make(map[types.StorageNodeID]storageNodeStat, len(snds))

for _, snd := range snds {
storageNodeID := snd.StorageNodeID
st := storageNodeStat{
storageNodeID: storageNodeID,
paths: make(map[string]struct{}, len(snd.Paths)),
assignedPaths: make(map[string]struct{}, len(snd.Paths)),
}
for _, path := range snd.Paths {
st.paths[path] = struct{}{}
}
stats[storageNodeID] = st
if len(md.StorageNodes) < s.repfactor {
return nil, fmt.Errorf("replica selector: not enough storage nodes: %d < %d", len(md.StorageNodes), s.repfactor)
}

lsds := md.GetLogStreams()
for _, lsd := range lsds {
for i, rd := range lsd.Replicas {
storageNodeID := rd.StorageNodeID
st, ok := stats[storageNodeID]
if !ok {
panic("replica selector: inconsistent cluster metadata")
}
st.replicas++
if i == 0 {
st.primaryReplicas++
}
st.assignedPaths[rd.StorageNodePath] = struct{}{}
stats[storageNodeID] = st
}
countersMap, err := s.newCounter(md)
if err != nil {
return nil, err
}

statsList := make([]storageNodeStat, 0, len(stats))
for _, st := range stats {
statsList = append(statsList, st)
err = s.count(md, countersMap)
if err != nil {
return nil, err
}
counters := s.sortedCounters(countersMap)
replicas := s.selectLFU(counters)
return replicas, nil
}

sort.Slice(statsList, func(i, j int) bool {
st1, st2 := statsList[i], statsList[j]
ut1, ut2 := st1.utilization(), st2.utilization()

if ut1 != ut2 {
return ut1 < ut2
func (s *lfuSelector) selectLFU(counters []lfuCounter) []*varlogpb.ReplicaDescriptor {
replicas := make([]*varlogpb.ReplicaDescriptor, s.repfactor)
for idx, counter := range counters[:s.repfactor] {
selectedPath := ""
min := uint(math.MaxUint)
for snpath, usedCount := range counter.snpaths {
if usedCount < min {
min = usedCount
selectedPath = snpath
}
}
replicas[idx] = &varlogpb.ReplicaDescriptor{
StorageNodeID: counter.snid,
StorageNodePath: selectedPath,
}
}
return replicas
}

if st1.primaryReplicas != st2.primaryReplicas {
return st1.primaryReplicas < st2.primaryReplicas
func (s *lfuSelector) sortedCounters(countersMap map[types.StorageNodeID]lfuCounter) []lfuCounter {
counters := maps.Values(countersMap)
sort.Slice(counters, func(i, j int) bool {
if counters[i].replicas == counters[j].replicas {
if counters[i].primaries == counters[j].primaries {
return counters[i].snid < counters[j].snid
}
return counters[i].primaries < counters[j].primaries
}
return st1.replicas < st2.replicas
return counters[i].replicas < counters[j].replicas
})
return counters
}

statsList = statsList[:sel.replicationFactor]
sort.Slice(statsList, func(i, j int) bool {
st1, st2 := statsList[i], statsList[j]
return st1.primaryReplicas < st2.primaryReplicas
})
func (s *lfuSelector) count(md *varlogpb.MetadataDescriptor, countersMap map[types.StorageNodeID]lfuCounter) error {
for _, lsd := range md.LogStreams {
for i, rd := range lsd.Replicas {
snid := rd.StorageNodeID
snCounter, ok := countersMap[rd.StorageNodeID]
if !ok {
return fmt.Errorf("replica selector: inconsistent cluster metadata: no matched storage node %d", snid)
}
snCounter.replicas++
if i == 0 {
snCounter.primaries++
}

rds := make([]*varlogpb.ReplicaDescriptor, 0, sel.replicationFactor)
for _, st := range statsList {
snd := md.GetStorageNode(st.storageNodeID)
var path string
if len(st.paths) == len(st.assignedPaths) {
path = snd.Paths[sel.rng.Intn(len(snd.Paths))]
} else {
for p := range st.paths {
if _, ok := st.assignedPaths[path]; ok {
continue
}
path = p
break
snpath := rd.StorageNodePath
snpathUsedCount, ok := snCounter.snpaths[snpath]
if !ok {
return fmt.Errorf("replica selector: inconsistent cluster metadata: no matched storage path %s", snpath)
}
snCounter.snpaths[snpath] = snpathUsedCount + 1
countersMap[snid] = snCounter
}
rds = append(rds, &varlogpb.ReplicaDescriptor{
StorageNodeID: st.storageNodeID,
StorageNodePath: path,
})
}

return rds, nil
}

type storageNodeStat struct {
storageNodeID types.StorageNodeID
replicas int
primaryReplicas int
paths map[string]struct{}
assignedPaths map[string]struct{}
return nil
}

func (s storageNodeStat) utilization() float64 {
return float64(s.replicas) / float64(len(s.paths))
func (s *lfuSelector) newCounter(md *varlogpb.MetadataDescriptor) (map[types.StorageNodeID]lfuCounter, error) {
countersMap := make(map[types.StorageNodeID]lfuCounter, len(md.StorageNodes))
for _, snd := range md.StorageNodes {
if snd.StorageNodeID.Invalid() {
return nil, errors.New("replica selector: invalid cluster metadata: invalid storage id")
}
if len(snd.Paths) == 0 {
return nil, fmt.Errorf("replica selector: invalid cluster metadata: no storage path in storage node %d", snd.StorageNodeID)
}
cnt := lfuCounter{
snid: snd.StorageNodeID,
snpaths: make(map[string]uint, len(snd.Paths)),
}
for _, snpath := range snd.Paths {
cnt.snpaths[snpath] = 0
}
countersMap[snd.StorageNodeID] = cnt
}
return countersMap, nil
}
Loading

0 comments on commit 690b414

Please sign in to comment.