Skip to content

Commit

Permalink
feat(admin): random replica selector
Browse files Browse the repository at this point in the history
It adds a new replica selector, `randomReplicaSelector`. It chooses storage nodes and paths
randomly.

Updates #393
  • Loading branch information
ijsong committed Apr 6, 2023
1 parent 6e7e74f commit ad1226e
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 0 deletions.
50 changes: 50 additions & 0 deletions internal/admin/replica_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,56 @@ type ReplicaSelector interface {
Select(ctx context.Context) ([]*varlogpb.ReplicaDescriptor, error)
}

type randomReplicaSelector struct {
rng *rand.Rand
cmview mrmanager.ClusterMetadataView
repfactor int
}

var _ ReplicaSelector = (*randomReplicaSelector)(nil)

func newRandomReplicaSelector(cmview mrmanager.ClusterMetadataView, repfactor int) (*randomReplicaSelector, error) {
if repfactor < 1 {
return nil, errors.Wrap(verrors.ErrInvalid, "replica selector: negative replication factor")
}
if cmview == nil {
return nil, errors.Wrap(verrors.ErrInvalid, "replica selector: invalid cluster metadata view")
}
s := &randomReplicaSelector{
rng: rand.New(rand.NewSource(time.Now().Unix())),
cmview: cmview,
repfactor: repfactor,
}
return s, nil
}

func (s *randomReplicaSelector) Name() string {
return "random"
}

func (s *randomReplicaSelector) Select(ctx context.Context) ([]*varlogpb.ReplicaDescriptor, error) {
md, err := s.cmview.ClusterMetadata(ctx)
if err != nil {
return nil, errors.WithMessage(err, "replica selector")
}

ret := make([]*varlogpb.ReplicaDescriptor, s.repfactor)

snds := md.StorageNodes
sndIndices := s.rng.Perm(len(snds))[:s.repfactor]
for idx, sndIdx := range sndIndices {
snd := snds[sndIdx]
snpaths := snd.Paths
pathIdx := s.rng.Intn(len(snpaths))
ret[idx] = &varlogpb.ReplicaDescriptor{
StorageNodeID: snd.StorageNodeID,
StorageNodePath: snpaths[pathIdx],
}
}

return ret, nil
}

// balancedReplicaSelector selects storage nodes and volumes for a new log stream to be balanced in
// terms of the number of replicas as well as the number of primary replica per storage node.
// Note that it does not consider loads of storage nodes.
Expand Down
125 changes: 125 additions & 0 deletions internal/admin/replica_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sort"
"strings"
"testing"

"github.com/golang/mock/gomock"
Expand All @@ -16,6 +17,130 @@ import (
"github.com/kakao/varlog/proto/varlogpb"
)

func newTestReplicaSelector(t *testing.T, name string, cmview mrmanager.ClusterMetadataView, repfactor int) (ReplicaSelector, error) {
switch strings.ToLower(name) {
case "random":
return newRandomReplicaSelector(cmview, repfactor)
case "balanced":
return newBalancedReplicaSelector(cmview, repfactor)
default:
return nil, fmt.Errorf("unknown selector: %s", name)
}
}

func TestReplicaSelector_ReplicationFactorZero(t *testing.T) {
tcs := []string{
"Random",
"Balanced",
}

for _, tc := range tcs {
t.Run(tc, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

cmview := mrmanager.NewMockClusterMetadataView(ctrl)
cmview.EXPECT().ClusterMetadata(gomock.Any()).Return(&varlogpb.MetadataDescriptor{}, nil).AnyTimes()
_, err := newTestReplicaSelector(t, tc, cmview, 0)
require.Error(t, err)
})
}
}

func TestReplicaSelector_ClusterMetadataViewNil(t *testing.T) {
tcs := []string{
"Random",
"Balanced",
}

for _, tc := range tcs {
t.Run(tc, func(t *testing.T) {
_, err := newTestReplicaSelector(t, tc, nil, 1)
require.Error(t, err)
})
}
}

func TestReplicaSelector(t *testing.T) {
type testCase struct {
name string
md *varlogpb.MetadataDescriptor
repfactor int
}

getInfo := func(md *varlogpb.MetadataDescriptor) map[types.StorageNodeID]map[string]bool {
snpaths := make(map[types.StorageNodeID]map[string]bool, len(md.StorageNodes))
for _, snd := range md.StorageNodes {
if _, ok := snpaths[snd.StorageNodeID]; !ok {
snpaths[snd.StorageNodeID] = make(map[string]bool, len(snd.Paths))
}
for _, path := range snd.Paths {
snpaths[snd.StorageNodeID][path] = true
}
}
return snpaths
}

tcs := []*testCase{
{
name: "Random",
repfactor: 2,
md: &varlogpb.MetadataDescriptor{
StorageNodes: []*varlogpb.StorageNodeDescriptor{
{
StorageNode: varlogpb.StorageNode{StorageNodeID: 1, Address: "sn1"},
Paths: []string{"/data1", "/data2"},
},
{
StorageNode: varlogpb.StorageNode{StorageNodeID: 2, Address: "sn2"},
Paths: []string{"/data1", "/data2"},
},
{
StorageNode: varlogpb.StorageNode{StorageNodeID: 3, Address: "sn3"},
Paths: []string{"/data1", "/data2"},
},
{
StorageNode: varlogpb.StorageNode{StorageNodeID: 4, Address: "sn4"},
Paths: []string{"/data1", "/data2"},
},
},
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

cmview := mrmanager.NewMockClusterMetadataView(ctrl)
cmview.EXPECT().ClusterMetadata(gomock.Any()).Return(tc.md, nil).AnyTimes()

s, err := newTestReplicaSelector(t, tc.name, cmview, tc.repfactor)
require.NoError(t, err)

require.Equal(t, strings.ToLower(tc.name), s.Name())

replicas, err := s.Select(context.Background())
require.NoError(t, err)

snpaths := make(map[types.StorageNodeID]string, tc.repfactor)
for _, replica := range replicas {
require.False(t, replica.StorageNodeID.Invalid())
require.NotEmpty(t, replica.StorageNodePath)
snpaths[replica.StorageNodeID] = replica.StorageNodePath
}
require.Len(t, snpaths, tc.repfactor)

mdSnpaths := getInfo(tc.md)
for snid, snpath := range snpaths {
require.Contains(t, mdSnpaths, snid)
require.Contains(t, mdSnpaths[snid], snpath)
}
})
}
}

func TestBalancedReplicaSelector(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down

0 comments on commit ad1226e

Please sign in to comment.