From 690b414635073486e41b7c836ad032a66caf502f Mon Sep 17 00:00:00 2001 From: Injun Song Date: Wed, 5 Apr 2023 16:57:38 +0900 Subject: [PATCH] feat(admin): LFU replica selector This PR adds a new replica selector based on the Least Frequently Used (LFU) algorithm. `lfuReplicaSelector` selects each replica's storage node and data path, giving preference to those with fewer assigned replicas. All replica selectors, including `lfuReplicaSelector`, are stateless, meaning they don't keep existing log stream topology. When `Select` is invoked, `lfuReplicaSelector` collects used counters for each storage node and path. Although it seems to be inefficient, it is simple as well as fault-tolerant. To select the least used storage nodes and paths, `lfuReplicaSelector` runs as follows: - Fetch cluster metadata from the `ClusterMetadataView`. - Increase the used counters for each storage node and path assigned to each log stream. - Sort counters. - Choose the least used storage nodes and paths. Updates #393 --- go.mod | 2 +- internal/admin/config.go | 2 +- internal/admin/replica_selector.go | 245 +++++++------ internal/admin/replica_selector_test.go | 448 ++++++++++++++---------- 4 files changed, 402 insertions(+), 295 deletions(-) diff --git a/go.mod b/go.mod index 4da289302..be55cd901 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/gogo/status v1.1.1 github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.5.3 - github.com/google/gofuzz v1.2.0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/lib/pq v1.10.7 github.com/pkg/errors v0.9.1 @@ -74,6 +73,7 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/gnostic v0.5.7-v3refs // indirect github.com/google/go-cmp v0.5.9 // indirect + github.com/google/gofuzz v1.2.0 // indirect github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/imdario/mergo v0.3.6 // indirect diff --git a/internal/admin/config.go b/internal/admin/config.go index 265de8194..4b855fada 100644 --- a/internal/admin/config.go +++ b/internal/admin/config.go @@ -81,7 +81,7 @@ func (cfg config) validate() error { func (cfg *config) ensureDefault() error { if cfg.snSelector == nil { - rs, err := newBalancedReplicaSelector(cfg.mrmgr.ClusterMetadataView(), int(cfg.replicationFactor)) + rs, err := newReplicaSelector(replicaSelectorNameLFU, cfg.mrmgr.ClusterMetadataView(), int(cfg.replicationFactor)) if err != nil { return err } diff --git a/internal/admin/replica_selector.go b/internal/admin/replica_selector.go index 801b87ea3..8baeb4de5 100644 --- a/internal/admin/replica_selector.go +++ b/internal/admin/replica_selector.go @@ -4,11 +4,15 @@ package admin import ( "context" + "fmt" + "math" "math/rand" "sort" + "strings" "time" "github.com/pkg/errors" + "golang.org/x/exp/maps" "github.com/kakao/varlog/internal/admin/mrmanager" "github.com/kakao/varlog/pkg/types" @@ -26,6 +30,30 @@ type ReplicaSelector interface { Select(ctx context.Context) ([]*varlogpb.ReplicaDescriptor, error) } +const ( + replicaSelectorNameRandom = "random" + replicaSelectorNameLFU = "lfu" // least frequently used +) + +func newReplicaSelector(selector string, cmview mrmanager.ClusterMetadataView, repfactor int) (ReplicaSelector, error) { + if repfactor < 1 { + return nil, errors.Wrap(verrors.ErrInvalid, "replica selector: negative replication factor") + } + + if cmview == nil { + return nil, errors.Wrap(verrors.ErrInvalid, "replica selector: invalid cluster metadata view") + } + + switch strings.ToLower(selector) { + case replicaSelectorNameRandom: + return newRandomReplicaSelector(cmview, repfactor) + case replicaSelectorNameLFU: + return newLFUSelector(cmview, repfactor) + default: + return nil, fmt.Errorf("unknown selector: %s", selector) + } +} + type randomReplicaSelector struct { rng *rand.Rand cmview mrmanager.ClusterMetadataView @@ -35,12 +63,6 @@ type randomReplicaSelector struct { var _ ReplicaSelector = (*randomReplicaSelector)(nil) func newRandomReplicaSelector(cmview mrmanager.ClusterMetadataView, repfactor int) (*randomReplicaSelector, error) { - if repfactor < 1 { - return nil, errors.Wrap(verrors.ErrInvalid, "replica selector: negative replication factor") - } - if cmview == nil { - return nil, errors.Wrap(verrors.ErrInvalid, "replica selector: invalid cluster metadata view") - } s := &randomReplicaSelector{ rng: rand.New(rand.NewSource(time.Now().Unix())), cmview: cmview, @@ -50,7 +72,7 @@ func newRandomReplicaSelector(cmview mrmanager.ClusterMetadataView, repfactor in } func (s *randomReplicaSelector) Name() string { - return "random" + return replicaSelectorNameRandom } func (s *randomReplicaSelector) Select(ctx context.Context) ([]*varlogpb.ReplicaDescriptor, error) { @@ -59,16 +81,28 @@ func (s *randomReplicaSelector) Select(ctx context.Context) ([]*varlogpb.Replica return nil, errors.WithMessage(err, "replica selector") } + if len(md.StorageNodes) < s.repfactor { + return nil, fmt.Errorf("replica selector: not enough storage nodes: %d < %d", len(md.StorageNodes), s.repfactor) + } + ret := make([]*varlogpb.ReplicaDescriptor, s.repfactor) snds := md.StorageNodes sndIndices := s.rng.Perm(len(snds))[:s.repfactor] for idx, sndIdx := range sndIndices { snd := snds[sndIdx] + snid := snd.StorageNodeID + if snid.Invalid() { + return nil, errors.New("replica selector: invalid cluster metadata: invalid storage id") + } + snpaths := snd.Paths + if len(snpaths) == 0 { + return nil, fmt.Errorf("replica selector: invalid cluster metadata: no storage path in storage node %d", snid) + } pathIdx := s.rng.Intn(len(snpaths)) ret[idx] = &varlogpb.ReplicaDescriptor{ - StorageNodeID: snd.StorageNodeID, + StorageNodeID: snid, StorageNodePath: snpaths[pathIdx], } } @@ -76,132 +110,129 @@ func (s *randomReplicaSelector) Select(ctx context.Context) ([]*varlogpb.Replica return ret, nil } -// balancedReplicaSelector selects storage nodes and volumes for a new log stream to be balanced in -// terms of the number of replicas as well as the number of primary replica per storage node. -// Note that it does not consider loads of storage nodes. -type balancedReplicaSelector struct { - rng *rand.Rand - cmView mrmanager.ClusterMetadataView - replicationFactor int +type lfuSelector struct { + cmview mrmanager.ClusterMetadataView + repfactor int } -var _ ReplicaSelector = (*balancedReplicaSelector)(nil) +type lfuCounter struct { + snid types.StorageNodeID + replicas uint + primaries uint + snpaths map[string]uint +} -func newBalancedReplicaSelector(cmView mrmanager.ClusterMetadataView, replicationFactor int) (*balancedReplicaSelector, error) { - if replicationFactor < 1 { - return nil, errors.Wrap(verrors.ErrInvalid, "replica selector: negative replication factor") - } - if cmView == nil { - return nil, errors.Wrap(verrors.ErrInvalid, "replica selector: invalid cluster metadata view") - } - sel := &balancedReplicaSelector{ - rng: rand.New(rand.NewSource(time.Now().Unix())), - cmView: cmView, - replicationFactor: replicationFactor, +func newLFUSelector(cmview mrmanager.ClusterMetadataView, repfactor int) (*lfuSelector, error) { + s := &lfuSelector{ + cmview: cmview, + repfactor: repfactor, } - return sel, nil + return s, nil } -func (sel *balancedReplicaSelector) Name() string { - return "balanced" +func (s *lfuSelector) Name() string { + return replicaSelectorNameLFU } -func (sel *balancedReplicaSelector) Select(ctx context.Context) ([]*varlogpb.ReplicaDescriptor, error) { - md, err := sel.cmView.ClusterMetadata(ctx) +func (s *lfuSelector) Select(ctx context.Context) ([]*varlogpb.ReplicaDescriptor, error) { + md, err := s.cmview.ClusterMetadata(ctx) if err != nil { return nil, errors.WithMessage(err, "replica selector") } - snds := md.GetStorageNodes() - stats := make(map[types.StorageNodeID]storageNodeStat, len(snds)) - - for _, snd := range snds { - storageNodeID := snd.StorageNodeID - st := storageNodeStat{ - storageNodeID: storageNodeID, - paths: make(map[string]struct{}, len(snd.Paths)), - assignedPaths: make(map[string]struct{}, len(snd.Paths)), - } - for _, path := range snd.Paths { - st.paths[path] = struct{}{} - } - stats[storageNodeID] = st + if len(md.StorageNodes) < s.repfactor { + return nil, fmt.Errorf("replica selector: not enough storage nodes: %d < %d", len(md.StorageNodes), s.repfactor) } - lsds := md.GetLogStreams() - for _, lsd := range lsds { - for i, rd := range lsd.Replicas { - storageNodeID := rd.StorageNodeID - st, ok := stats[storageNodeID] - if !ok { - panic("replica selector: inconsistent cluster metadata") - } - st.replicas++ - if i == 0 { - st.primaryReplicas++ - } - st.assignedPaths[rd.StorageNodePath] = struct{}{} - stats[storageNodeID] = st - } + countersMap, err := s.newCounter(md) + if err != nil { + return nil, err } - statsList := make([]storageNodeStat, 0, len(stats)) - for _, st := range stats { - statsList = append(statsList, st) + err = s.count(md, countersMap) + if err != nil { + return nil, err } + counters := s.sortedCounters(countersMap) + replicas := s.selectLFU(counters) + return replicas, nil +} - sort.Slice(statsList, func(i, j int) bool { - st1, st2 := statsList[i], statsList[j] - ut1, ut2 := st1.utilization(), st2.utilization() - - if ut1 != ut2 { - return ut1 < ut2 +func (s *lfuSelector) selectLFU(counters []lfuCounter) []*varlogpb.ReplicaDescriptor { + replicas := make([]*varlogpb.ReplicaDescriptor, s.repfactor) + for idx, counter := range counters[:s.repfactor] { + selectedPath := "" + min := uint(math.MaxUint) + for snpath, usedCount := range counter.snpaths { + if usedCount < min { + min = usedCount + selectedPath = snpath + } + } + replicas[idx] = &varlogpb.ReplicaDescriptor{ + StorageNodeID: counter.snid, + StorageNodePath: selectedPath, } + } + return replicas +} - if st1.primaryReplicas != st2.primaryReplicas { - return st1.primaryReplicas < st2.primaryReplicas +func (s *lfuSelector) sortedCounters(countersMap map[types.StorageNodeID]lfuCounter) []lfuCounter { + counters := maps.Values(countersMap) + sort.Slice(counters, func(i, j int) bool { + if counters[i].replicas == counters[j].replicas { + if counters[i].primaries == counters[j].primaries { + return counters[i].snid < counters[j].snid + } + return counters[i].primaries < counters[j].primaries } - return st1.replicas < st2.replicas + return counters[i].replicas < counters[j].replicas }) + return counters +} - statsList = statsList[:sel.replicationFactor] - sort.Slice(statsList, func(i, j int) bool { - st1, st2 := statsList[i], statsList[j] - return st1.primaryReplicas < st2.primaryReplicas - }) +func (s *lfuSelector) count(md *varlogpb.MetadataDescriptor, countersMap map[types.StorageNodeID]lfuCounter) error { + for _, lsd := range md.LogStreams { + for i, rd := range lsd.Replicas { + snid := rd.StorageNodeID + snCounter, ok := countersMap[rd.StorageNodeID] + if !ok { + return fmt.Errorf("replica selector: inconsistent cluster metadata: no matched storage node %d", snid) + } + snCounter.replicas++ + if i == 0 { + snCounter.primaries++ + } - rds := make([]*varlogpb.ReplicaDescriptor, 0, sel.replicationFactor) - for _, st := range statsList { - snd := md.GetStorageNode(st.storageNodeID) - var path string - if len(st.paths) == len(st.assignedPaths) { - path = snd.Paths[sel.rng.Intn(len(snd.Paths))] - } else { - for p := range st.paths { - if _, ok := st.assignedPaths[path]; ok { - continue - } - path = p - break + snpath := rd.StorageNodePath + snpathUsedCount, ok := snCounter.snpaths[snpath] + if !ok { + return fmt.Errorf("replica selector: inconsistent cluster metadata: no matched storage path %s", snpath) } + snCounter.snpaths[snpath] = snpathUsedCount + 1 + countersMap[snid] = snCounter } - rds = append(rds, &varlogpb.ReplicaDescriptor{ - StorageNodeID: st.storageNodeID, - StorageNodePath: path, - }) } - - return rds, nil -} - -type storageNodeStat struct { - storageNodeID types.StorageNodeID - replicas int - primaryReplicas int - paths map[string]struct{} - assignedPaths map[string]struct{} + return nil } -func (s storageNodeStat) utilization() float64 { - return float64(s.replicas) / float64(len(s.paths)) +func (s *lfuSelector) newCounter(md *varlogpb.MetadataDescriptor) (map[types.StorageNodeID]lfuCounter, error) { + countersMap := make(map[types.StorageNodeID]lfuCounter, len(md.StorageNodes)) + for _, snd := range md.StorageNodes { + if snd.StorageNodeID.Invalid() { + return nil, errors.New("replica selector: invalid cluster metadata: invalid storage id") + } + if len(snd.Paths) == 0 { + return nil, fmt.Errorf("replica selector: invalid cluster metadata: no storage path in storage node %d", snd.StorageNodeID) + } + cnt := lfuCounter{ + snid: snd.StorageNodeID, + snpaths: make(map[string]uint, len(snd.Paths)), + } + for _, snpath := range snd.Paths { + cnt.snpaths[snpath] = 0 + } + countersMap[snd.StorageNodeID] = cnt + } + return countersMap, nil } diff --git a/internal/admin/replica_selector_test.go b/internal/admin/replica_selector_test.go index 6d3b2cd27..443b9ab04 100644 --- a/internal/admin/replica_selector_test.go +++ b/internal/admin/replica_selector_test.go @@ -2,88 +2,211 @@ package admin import ( "context" - "fmt" "sort" - "strings" "testing" "github.com/golang/mock/gomock" - fuzz "github.com/google/gofuzz" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/exp/maps" "github.com/kakao/varlog/internal/admin/mrmanager" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/proto/varlogpb" ) -func newTestReplicaSelector(t *testing.T, name string, cmview mrmanager.ClusterMetadataView, repfactor int) (ReplicaSelector, error) { - switch strings.ToLower(name) { - case "random": - return newRandomReplicaSelector(cmview, repfactor) - case "balanced": - return newBalancedReplicaSelector(cmview, repfactor) - default: - return nil, fmt.Errorf("unknown selector: %s", name) - } +func TestReplicaSelector_NewUnknown(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cmview := mrmanager.NewMockClusterMetadataView(ctrl) + cmview.EXPECT().ClusterMetadata(gomock.Any()).Return(&varlogpb.MetadataDescriptor{}, nil).AnyTimes() + + _, err := newReplicaSelector("foo", cmview, 1) + require.Error(t, err) + } -func TestReplicaSelector_ReplicationFactorZero(t *testing.T) { - tcs := []string{ - "Random", - "Balanced", +func TestReplicaSelector_New(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cmview := mrmanager.NewMockClusterMetadataView(ctrl) + cmview.EXPECT().ClusterMetadata(gomock.Any()).Return(&varlogpb.MetadataDescriptor{}, nil).AnyTimes() + + selectors := []string{ + replicaSelectorNameRandom, + replicaSelectorNameLFU, + } + tcs := []struct { + name string + cmview mrmanager.ClusterMetadataView + repfactor int + }{ + { + name: "ReplicationFactorZero", + cmview: cmview, + repfactor: 0, + }, + { + name: "ClusterMetadataViewNil", + cmview: nil, + repfactor: 1, + }, } for _, tc := range tcs { - t.Run(tc, func(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - cmview := mrmanager.NewMockClusterMetadataView(ctrl) - cmview.EXPECT().ClusterMetadata(gomock.Any()).Return(&varlogpb.MetadataDescriptor{}, nil).AnyTimes() - _, err := newTestReplicaSelector(t, tc, cmview, 0) - require.Error(t, err) - }) + for _, selector := range selectors { + t.Run(selector+tc.name, func(t *testing.T) { + _, err := newReplicaSelector(selector, tc.cmview, tc.repfactor) + require.Error(t, err) + }) + } } } -func TestReplicaSelector_ClusterMetadataViewNil(t *testing.T) { - tcs := []string{ - "Random", - "Balanced", +func testCheckInvariant(t *testing.T, md *varlogpb.MetadataDescriptor, repfactor int, selectedReplicas []*varlogpb.ReplicaDescriptor) { + selectedStorageNodePaths := make(map[types.StorageNodeID]string, repfactor) + for _, replica := range selectedReplicas { + require.False(t, replica.StorageNodeID.Invalid()) + require.NotEmpty(t, replica.StorageNodePath) + selectedStorageNodePaths[replica.StorageNodeID] = replica.StorageNodePath } + require.Len(t, selectedStorageNodePaths, repfactor) - for _, tc := range tcs { - t.Run(tc, func(t *testing.T) { - _, err := newTestReplicaSelector(t, tc, nil, 1) - require.Error(t, err) - }) + allStorageNodePaths := make(map[types.StorageNodeID]map[string]bool, len(md.StorageNodes)) + for _, snd := range md.StorageNodes { + if _, ok := allStorageNodePaths[snd.StorageNodeID]; !ok { + allStorageNodePaths[snd.StorageNodeID] = make(map[string]bool, len(snd.Paths)) + } + for _, path := range snd.Paths { + allStorageNodePaths[snd.StorageNodeID][path] = true + } + } + for snid, snpath := range selectedStorageNodePaths { + require.Contains(t, allStorageNodePaths, snid) + require.Contains(t, allStorageNodePaths[snid], snpath) } } func TestReplicaSelector(t *testing.T) { + const tpid = types.TopicID(1) + type testCase struct { name string + selectors []string md *varlogpb.MetadataDescriptor repfactor int - } - - getInfo := func(md *varlogpb.MetadataDescriptor) map[types.StorageNodeID]map[string]bool { - snpaths := make(map[types.StorageNodeID]map[string]bool, len(md.StorageNodes)) - for _, snd := range md.StorageNodes { - if _, ok := snpaths[snd.StorageNodeID]; !ok { - snpaths[snd.StorageNodeID] = make(map[string]bool, len(snd.Paths)) - } - for _, path := range snd.Paths { - snpaths[snd.StorageNodeID][path] = true - } - } - return snpaths + testf func(t *testing.T, tc *testCase, s ReplicaSelector) } tcs := []*testCase{ { - name: "Random", + name: "NotEnoughStorageNodes", + selectors: []string{replicaSelectorNameRandom, replicaSelectorNameLFU}, + repfactor: 1, + md: &varlogpb.MetadataDescriptor{}, + testf: func(t *testing.T, _ *testCase, s ReplicaSelector) { + _, err := s.Select(context.Background()) + require.Error(t, err) + }, + }, + { + name: "InvalidClusterMetadataInvalidStorageNodeID", + selectors: []string{replicaSelectorNameRandom, replicaSelectorNameLFU}, + repfactor: 1, + md: &varlogpb.MetadataDescriptor{ + StorageNodes: []*varlogpb.StorageNodeDescriptor{ + { + StorageNode: varlogpb.StorageNode{Address: "sn1"}, + Paths: []string{}, + }, + }, + LogStreams: []*varlogpb.LogStreamDescriptor{}, + }, + testf: func(t *testing.T, _ *testCase, s ReplicaSelector) { + _, err := s.Select(context.Background()) + require.Error(t, err) + }, + }, + { + name: "InvalidClusterMetadataNoStoragePath", + selectors: []string{replicaSelectorNameRandom, replicaSelectorNameLFU}, + repfactor: 1, + md: &varlogpb.MetadataDescriptor{ + StorageNodes: []*varlogpb.StorageNodeDescriptor{ + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 1, Address: "sn1"}, + Paths: []string{}, + }, + }, + LogStreams: []*varlogpb.LogStreamDescriptor{}, + }, + testf: func(t *testing.T, _ *testCase, s ReplicaSelector) { + _, err := s.Select(context.Background()) + require.Error(t, err) + }, + }, + { + name: "InvalidClusterMetadataNoStorageNode", + selectors: []string{replicaSelectorNameLFU}, + repfactor: 1, + md: &varlogpb.MetadataDescriptor{ + StorageNodes: []*varlogpb.StorageNodeDescriptor{ + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 1, Address: "sn1"}, + Paths: []string{"/data1"}, + }, + }, + LogStreams: []*varlogpb.LogStreamDescriptor{ + { + LogStreamID: 1, + Replicas: []*varlogpb.ReplicaDescriptor{ + { + StorageNodeID: 2, + StorageNodePath: "/data1", + DataPath: "/data1/foo", + }, + }, + }, + }, + }, + testf: func(t *testing.T, _ *testCase, s ReplicaSelector) { + _, err := s.Select(context.Background()) + require.Error(t, err) + }, + }, + { + name: "InvalidClusterMetadataNoStorageNodePath", + selectors: []string{replicaSelectorNameLFU}, + repfactor: 1, + md: &varlogpb.MetadataDescriptor{ + StorageNodes: []*varlogpb.StorageNodeDescriptor{ + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 1, Address: "sn1"}, + Paths: []string{"/data2"}, + }, + }, + LogStreams: []*varlogpb.LogStreamDescriptor{ + { + LogStreamID: 1, + Replicas: []*varlogpb.ReplicaDescriptor{ + { + StorageNodeID: 1, + StorageNodePath: "/data1", + DataPath: "/data1/foo", + }, + }, + }, + }, + }, + testf: func(t *testing.T, _ *testCase, s ReplicaSelector) { + _, err := s.Select(context.Background()) + require.Error(t, err) + }, + }, + { + name: "Select", + selectors: []string{replicaSelectorNameRandom}, repfactor: 2, md: &varlogpb.MetadataDescriptor{ StorageNodes: []*varlogpb.StorageNodeDescriptor{ @@ -105,157 +228,110 @@ func TestReplicaSelector(t *testing.T) { }, }, }, - }, - } - - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - cmview := mrmanager.NewMockClusterMetadataView(ctrl) - cmview.EXPECT().ClusterMetadata(gomock.Any()).Return(tc.md, nil).AnyTimes() - - s, err := newTestReplicaSelector(t, tc.name, cmview, tc.repfactor) - require.NoError(t, err) - - require.Equal(t, strings.ToLower(tc.name), s.Name()) - - replicas, err := s.Select(context.Background()) - require.NoError(t, err) + testf: func(t *testing.T, tc *testCase, s ReplicaSelector) { + for i := 0; i < 10; i++ { + replicas, err := s.Select(context.Background()) + require.NoError(t, err) - snpaths := make(map[types.StorageNodeID]string, tc.repfactor) - for _, replica := range replicas { - require.False(t, replica.StorageNodeID.Invalid()) - require.NotEmpty(t, replica.StorageNodePath) - snpaths[replica.StorageNodeID] = replica.StorageNodePath - } - require.Len(t, snpaths, tc.repfactor) + testCheckInvariant(t, tc.md, tc.repfactor, replicas) - mdSnpaths := getInfo(tc.md) - for snid, snpath := range snpaths { - require.Contains(t, mdSnpaths, snid) - require.Contains(t, mdSnpaths[snid], snpath) - } - }) - } -} - -func TestBalancedReplicaSelector(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - const ( - testCount = 100 - replicationFactor = 3 - maxStorageNodes = 10 - minStoragesPerNode = 1 - maxStoragesPerNode = 4 - numLogStreams = maxStorageNodes * maxStoragesPerNode / replicationFactor - ) - - for i := 0; i < testCount; i++ { - f := fuzz.New().NilChance(0).Funcs( - func(snds *[]*varlogpb.StorageNodeDescriptor, c fuzz.Continue) { - cnt := c.Intn(maxStorageNodes+1-replicationFactor) + replicationFactor - *snds = make([]*varlogpb.StorageNodeDescriptor, cnt) - for j := 0; j < cnt; j++ { - snd := &varlogpb.StorageNodeDescriptor{} - snd.StorageNodeID = types.StorageNodeID(j + 1) - snd.Paths = make([]string, c.Intn(maxStoragesPerNode+1-minStoragesPerNode)+minStoragesPerNode) - for k := 0; k < len(snd.Paths); k++ { - snd.Paths[k] = fmt.Sprintf("/data%d", k+1) - } - (*snds)[j] = snd + err = tc.md.InsertLogStream(&varlogpb.LogStreamDescriptor{ + TopicID: tpid, + LogStreamID: types.LogStreamID(i + 1), + Replicas: replicas, + }) + require.NoError(t, err) } }, - func(lsds *[]*varlogpb.LogStreamDescriptor, c fuzz.Continue) { - *lsds = nil + }, + { + name: "Select", + selectors: []string{replicaSelectorNameLFU}, + repfactor: 3, + md: &varlogpb.MetadataDescriptor{ + StorageNodes: []*varlogpb.StorageNodeDescriptor{ + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 1, Address: "sn1"}, + Paths: []string{"/data1", "/data2", "/data3", "/data4"}, + }, + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 2, Address: "sn2"}, + Paths: []string{"/data1", "/data2", "/data3", "/data4"}, + }, + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 3, Address: "sn3"}, + Paths: []string{"/data1", "/data2", "/data3", "/data4"}, + }, + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 4, Address: "sn4"}, + Paths: []string{"/data1", "/data2", "/data3", "/data4"}, + }, + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 5, Address: "sn5"}, + Paths: []string{"/data1", "/data2", "/data3", "/data4"}, + }, + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 6, Address: "sn6"}, + Paths: []string{"/data1", "/data2", "/data3", "/data4"}, + }, + }, + LogStreams: []*varlogpb.LogStreamDescriptor{}, }, - ) - md := &varlogpb.MetadataDescriptor{} - f.Fuzz(md) + testf: func(t *testing.T, tc *testCase, s ReplicaSelector) { + for i := 0; i < 100; i++ { + replicas, err := s.Select(context.Background()) + require.NoError(t, err) - cmView := mrmanager.NewMockClusterMetadataView(ctrl) - cmView.EXPECT().ClusterMetadata(gomock.Any()).Return(md, nil).AnyTimes() + testCheckInvariant(t, tc.md, tc.repfactor, replicas) - sel, err := newBalancedReplicaSelector(cmView, replicationFactor) - require.NoError(t, err) + err = tc.md.InsertLogStream(&varlogpb.LogStreamDescriptor{ + TopicID: tpid, + LogStreamID: types.LogStreamID(i + 1), + Replicas: replicas, + }) + require.NoError(t, err) - for j := 0; j < numLogStreams; j++ { - rds, err := sel.Select(context.Background()) - require.NoError(t, err) - require.Len(t, rds, replicationFactor) + replicasCounts := make(map[types.StorageNodeID]int) + primariesCounts := make(map[types.StorageNodeID]int) - err = md.InsertLogStream(&varlogpb.LogStreamDescriptor{ - LogStreamID: types.LogStreamID(j + 1), - Replicas: rds, - }) - require.NoError(t, err) - } + for _, lsd := range tc.md.LogStreams { + for i, rd := range lsd.Replicas { + snid := rd.StorageNodeID + replicasCounts[snid]++ + if i == 0 { + primariesCounts[snid]++ + } + } + } - // Very generous tolerance - tolerance := float64(numLogStreams * replicationFactor / len(md.StorageNodes)) - equalNumStorages := true - numPaths := len(md.StorageNodes[0].Paths) - for _, snd := range md.StorageNodes[1:] { - if numPaths != len(snd.Paths) { - equalNumStorages = false - break - } - } - if equalNumStorages { - tolerance = 1.3 - } - testVerifyLogStreamDescriptors(t, md, tolerance) - } -} + counts := maps.Values(replicasCounts) + sort.Ints(counts) + diff := counts[len(counts)-1] - counts[0] + require.LessOrEqual(t, diff, 1) -func testVerifyLogStreamDescriptors(t *testing.T, md *varlogpb.MetadataDescriptor, tolerance float64) { - paths := make(map[types.StorageNodeID]int) - replicas := make(map[types.StorageNodeID]int) - primaries := make(map[types.StorageNodeID]int) - for _, lsd := range md.LogStreams { - for i, rd := range lsd.Replicas { - storageNodeID := rd.StorageNodeID - replicas[storageNodeID]++ - if i == 0 { - primaries[storageNodeID]++ - } - } - } - for _, snd := range md.StorageNodes { - storageNodeID := snd.StorageNodeID - paths[storageNodeID] = len(snd.Paths) + counts = maps.Values(primariesCounts) + sort.Ints(counts) + diff = counts[len(counts)-1] - counts[0] + require.LessOrEqual(t, diff, 2) + } + }, + }, } - testUtilizationBalance(t, paths, replicas, tolerance) - testPrimariesBalance(t, primaries, tolerance) -} + for _, tc := range tcs { + for _, selector := range tc.selectors { + t.Run(selector+tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() -func testUtilizationBalance(t *testing.T, paths, replicas map[types.StorageNodeID]int, tolerance float64) { - us := make([]float64, 0, len(paths)) - for snid, p := range paths { - r, ok := replicas[snid] - if !ok { - us = append(us, 0) - continue - } - us = append(us, float64(r)/float64(p)) - } - sort.Float64s(us) - min, max := us[0], us[len(us)-1] - require.LessOrEqual(t, max/min, tolerance) -} + cmview := mrmanager.NewMockClusterMetadataView(ctrl) + cmview.EXPECT().ClusterMetadata(gomock.Any()).Return(tc.md, nil).AnyTimes() -func testPrimariesBalance(t *testing.T, primaries map[types.StorageNodeID]int, tolerance float64) { - ps := make([]int, 0, len(primaries)) - for _, p := range primaries { - ps = append(ps, p) - } - sort.Ints(ps) - min, max := ps[0], ps[len(ps)-1] - if !assert.LessOrEqual(t, float64(max-min), tolerance) { - t.Logf("primaries: %+v", primaries) + s, err := newReplicaSelector(selector, cmview, tc.repfactor) + require.NoError(t, err) + require.Equal(t, selector, s.Name()) + tc.testf(t, tc, s) + }) + } } }