Skip to content

Commit

Permalink
Introduce a random selection of filtered drives when scheduling
Browse files Browse the repository at this point in the history
- Run a random index generator over the filter drives which have the same (max)total capacity.
- This change to reduce the conflicts while scheduling volumes to drives

Signed-off-by: Praveenrajmani <praveen@minio.io>
  • Loading branch information
Praveenrajmani authored and wlan0 committed Apr 26, 2021
1 parent c66dabe commit 101c12d
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 45 deletions.
1 change: 1 addition & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ func (c *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolu
finalizers = append(finalizers, finalizer)
drive.SetFinalizers(finalizers)

glog.Infof("Reserving DirectCSI drive: (Name: %s, NodeName: %s)", drive.Name, drive.Status.NodeName)
if _, err := dclient.Update(ctx, drive, metav1.UpdateOptions{
TypeMeta: utils.DirectCSIDriveTypeMeta(strings.Join([]string{directcsi.Group, directcsi.Version}, "/")),
}); err != nil {
Expand Down
178 changes: 146 additions & 32 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,15 @@ const (
mb30 = 30 * MB
)

func TestSelectDriveByTopology(t1 *testing.T) {
func TestSelectDrivesByTopology(t1 *testing.T) {

getDriveNameSet := func(drives []directcsi.DirectCSIDrive) []string {
driveNames := []string{}
for _, drive := range drives {
driveNames = append(driveNames, drive.Name)
}
return driveNames
}

testDriveSet := []directcsi.DirectCSIDrive{
{
Expand All @@ -53,6 +61,14 @@ func TestSelectDriveByTopology(t1 *testing.T) {
Topology: map[string]string{"node": "N1", "rack": "RK1", "zone": "Z1", "region": "R1"},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "drive11",
},
Status: directcsi.DirectCSIDriveStatus{
Topology: map[string]string{"node": "N1", "rack": "RK1", "zone": "Z1", "region": "R1"},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "drive2",
Expand All @@ -61,6 +77,14 @@ func TestSelectDriveByTopology(t1 *testing.T) {
Topology: map[string]string{"node": "N2", "rack": "RK2", "zone": "Z2", "region": "R2"},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "drive22",
},
Status: directcsi.DirectCSIDriveStatus{
Topology: map[string]string{"node": "N2", "rack": "RK2", "zone": "Z2", "region": "R2"},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "drive3",
Expand All @@ -72,56 +96,56 @@ func TestSelectDriveByTopology(t1 *testing.T) {
}

testCases := []struct {
name string
topologyRequest *csi.Topology
errExpected bool
selectedDriveName string
name string
topologyRequest *csi.Topology
errExpected bool
selectedDriveNames []string
}{
{
name: "test1",
topologyRequest: &csi.Topology{Segments: map[string]string{"node": "N2", "rack": "RK2", "zone": "Z2", "region": "R2"}},
errExpected: false,
selectedDriveName: "drive2",
name: "test1",
topologyRequest: &csi.Topology{Segments: map[string]string{"node": "N2", "rack": "RK2", "zone": "Z2", "region": "R2"}},
errExpected: false,
selectedDriveNames: []string{"drive2", "drive22"},
},
{
name: "test2",
topologyRequest: &csi.Topology{Segments: map[string]string{"node": "N3", "rack": "RK3", "zone": "Z3", "region": "R3"}},
errExpected: false,
selectedDriveName: "drive3",
name: "test2",
topologyRequest: &csi.Topology{Segments: map[string]string{"node": "N3", "rack": "RK3", "zone": "Z3", "region": "R3"}},
errExpected: false,
selectedDriveNames: []string{"drive3"},
},
{
name: "test3",
topologyRequest: &csi.Topology{Segments: map[string]string{"node": "N4", "rack": "RK2", "zone": "Z4", "region": "R2"}},
errExpected: true,
selectedDriveName: "",
name: "test3",
topologyRequest: &csi.Topology{Segments: map[string]string{"node": "N4", "rack": "RK2", "zone": "Z4", "region": "R2"}},
errExpected: true,
selectedDriveNames: []string{},
},
{
name: "test4",
topologyRequest: &csi.Topology{Segments: map[string]string{"node": "N3", "rack": "RK3"}},
errExpected: false,
selectedDriveName: "drive3",
name: "test4",
topologyRequest: &csi.Topology{Segments: map[string]string{"node": "N3", "rack": "RK3"}},
errExpected: false,
selectedDriveNames: []string{"drive3"},
},
{
name: "test5",
topologyRequest: &csi.Topology{Segments: map[string]string{"node": "N1", "rack": "RK5"}},
errExpected: true,
selectedDriveName: "",
name: "test5",
topologyRequest: &csi.Topology{Segments: map[string]string{"node": "N1", "rack": "RK5"}},
errExpected: true,
selectedDriveNames: []string{},
},
{
name: "test5",
topologyRequest: &csi.Topology{Segments: map[string]string{"node": "N1"}},
errExpected: false,
selectedDriveName: "drive1",
name: "test5",
topologyRequest: &csi.Topology{Segments: map[string]string{"node": "N1"}},
errExpected: false,
selectedDriveNames: []string{"drive1", "drive11"},
},
}

for _, tt := range testCases {
t1.Run(tt.name, func(t1 *testing.T) {
selectedDrive, err := selectDriveByTopology(tt.topologyRequest, testDriveSet)
selectedDrives, err := selectDrivesByTopology(tt.topologyRequest, testDriveSet)
if tt.errExpected && err == nil {
t1.Fatalf("Test case name %s: Expected error but succeeded", tt.name)
} else if selectedDrive.Name != tt.selectedDriveName {
t1.Errorf("Test case name %s: Expected drive name = %s, got %v", tt.name, tt.selectedDriveName, selectedDrive.Name)
} else if !reflect.DeepEqual(getDriveNameSet(selectedDrives), tt.selectedDriveNames) {
t1.Errorf("Test case name %s: Expected drive names = %s, got %v", tt.name, tt.selectedDriveNames, getDriveNameSet(selectedDrives))
}
})
}
Expand Down Expand Up @@ -916,3 +940,93 @@ func TestCreateVolume(t *testing.T) {
}
}
}

func TestSelectDriveByFreeCapacity(t1 *testing.T) {
testCases := []struct {
name string
driveList []directcsi.DirectCSIDrive
expectedDriveNames []string
}{
{
name: "test1",
driveList: []directcsi.DirectCSIDrive{
{
ObjectMeta: metav1.ObjectMeta{
Name: "drive1",
},
Status: directcsi.DirectCSIDriveStatus{
FreeCapacity: 1000,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "drive2",
},
Status: directcsi.DirectCSIDriveStatus{
FreeCapacity: 2000,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "drive3",
},
Status: directcsi.DirectCSIDriveStatus{
FreeCapacity: 3000,
},
},
},
expectedDriveNames: []string{"drive3"},
},
{
name: "test2",
driveList: []directcsi.DirectCSIDrive{
{
ObjectMeta: metav1.ObjectMeta{
Name: "drive1",
},
Status: directcsi.DirectCSIDriveStatus{
FreeCapacity: 4000,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "drive2",
},
Status: directcsi.DirectCSIDriveStatus{
FreeCapacity: 4000,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "drive3",
},
Status: directcsi.DirectCSIDriveStatus{
FreeCapacity: 3000,
},
},
},
expectedDriveNames: []string{"drive1", "drive2"},
},
}

checkDriveName := func(expectedDriveNames []string, driveName string) bool {
for _, edName := range expectedDriveNames {
if edName == driveName {
return true
}
}
return false
}

for _, tt := range testCases {
t1.Run(tt.name, func(t1 *testing.T) {
selectedDrive, err := selectDriveByFreeCapacity(tt.driveList)
if err != nil {
t1.Fatalf("Text case name: %s: Error: %v", tt.name, err)
}
if !checkDriveName(tt.expectedDriveNames, selectedDrive.Name) {
t1.Errorf("Test case name %s: Unexpected drive selected. Expected one among %v but got %s", tt.name, tt.expectedDriveNames, selectedDrive.Name)
}
})
}
}
62 changes: 49 additions & 13 deletions pkg/controller/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package controller

import (
"crypto/rand"
"math/big"
"sort"

directcsi "github.com/minio/direct-csi/pkg/apis/direct.csi.min.io/v1beta1"
Expand Down Expand Up @@ -136,41 +138,75 @@ func FilterDrivesByTopologyRequirements(volReq *csi.CreateVolumeRequest, csiDriv
preferredXs := tReq.GetPreferred()
requisiteXs := tReq.GetRequisite()

// Sort the drives by free capacity [Descending]
sort.SliceStable(csiDrives, func(i, j int) bool {
return csiDrives[i].Status.FreeCapacity > csiDrives[j].Status.FreeCapacity
})

// Try to fullfill the preferred topology request, If not, fallback to requisite list.
// Ref: https://godoc.org/github.com/container-storage-interface/spec/lib/go/csi#TopologyRequirement
for _, preferredTop := range preferredXs {
if selectedDrive, err := selectDriveByTopology(preferredTop, csiDrives); err == nil {
return selectedDrive, nil
if selectedDrives, err := selectDrivesByTopology(preferredTop, csiDrives); err == nil {
return selectDriveByFreeCapacity(selectedDrives)
}
}

for _, requisiteTop := range requisiteXs {
if selectedDrive, err := selectDriveByTopology(requisiteTop, csiDrives); err == nil {
return selectedDrive, nil
if selectedDrives, err := selectDrivesByTopology(requisiteTop, csiDrives); err == nil {
return selectDriveByFreeCapacity(selectedDrives)
}
}

if len(preferredXs) == 0 && len(requisiteXs) == 0 {
return csiDrives[0], nil
return selectDriveByFreeCapacity(csiDrives)
}

return directcsi.DirectCSIDrive{}, status.Error(codes.ResourceExhausted, "Cannot satisfy the topology constraint")
}

func selectDriveByTopology(top *csi.Topology, csiDrives []directcsi.DirectCSIDrive) (directcsi.DirectCSIDrive, error) {
func selectDriveByFreeCapacity(csiDrives []directcsi.DirectCSIDrive) (directcsi.DirectCSIDrive, error) {
// Sort the drives by free capacity [Descending]
sort.SliceStable(csiDrives, func(i, j int) bool {
return csiDrives[i].Status.FreeCapacity > csiDrives[j].Status.FreeCapacity
})

groupByFreeCapacity := func() []directcsi.DirectCSIDrive {
maxFreeCapacity := csiDrives[0].Status.FreeCapacity
groupedDrives := []directcsi.DirectCSIDrive{}
for _, csiDrive := range csiDrives {
if csiDrive.Status.FreeCapacity == maxFreeCapacity {
groupedDrives = append(groupedDrives, csiDrive)
}
}
return groupedDrives
}

pickRandomIndex := func(max int) (int, error) {
rInt, err := rand.Int(rand.Reader, big.NewInt(int64(max)))
if err != nil {
return int(0), err
}
return int(rInt.Int64()), nil
}

selectedDrives := groupByFreeCapacity()
rIndex, err := pickRandomIndex(len(selectedDrives))
if err != nil {
return selectedDrives[rIndex], status.Errorf(codes.Internal, "Error while selecting (random) drive: %v", err)
}
return selectedDrives[rIndex], nil
}

func selectDrivesByTopology(top *csi.Topology, csiDrives []directcsi.DirectCSIDrive) ([]directcsi.DirectCSIDrive, error) {
matchingDriveList := []directcsi.DirectCSIDrive{}
topSegments := top.GetSegments()
for _, csiDrive := range csiDrives {
driveSegments := csiDrive.Status.Topology
if matchSegments(topSegments, driveSegments) {
return csiDrive, nil
matchingDriveList = append(matchingDriveList, csiDrive)
}
}
return directcsi.DirectCSIDrive{}, status.Error(codes.ResourceExhausted, "Cannot satisfy the topology constraint")
return matchingDriveList, func() error {
if len(matchingDriveList) == 0 {
return status.Error(codes.ResourceExhausted, "Cannot satisfy the topology constraint")
}
return nil
}()
}

func matchSegments(topSegments, driveSegments map[string]string) bool {
Expand Down

0 comments on commit 101c12d

Please sign in to comment.