Skip to content

Commit

Permalink
Signed-off-by: yangqz <yangqz@tydic.com>
Browse files Browse the repository at this point in the history
1.Fix device share plugins npe
2.Fix vgpu device handshake patch error
3.update and add deviceshare ut

(cherry picked from commit 0237537)
  • Loading branch information
yangqz authored and Monokaix committed May 16, 2024
1 parent 3dbc5af commit 1218c9e
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 49 deletions.
2 changes: 2 additions & 0 deletions docs/user-guide/how_to_use_gpu_number.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

Refer to [Install Guide](../../installer/README.md) to install volcano.

> **Note** The Volcano VGPU feature has been transferred to the HAMI project, click [here](https://github.com/Project-HAMi/volcano-vgpu-device-plugin) to access
After installed, update the scheduler configuration:

```shell script
Expand Down
11 changes: 5 additions & 6 deletions pkg/scheduler/api/devices/nvidia/vgpu/device_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type GPUDevices struct {
Name string

// We cache score in filter step according to schedulePolicy, to avoid recalculating in score
ScoreMap map[string]float64
Score float64

Device map[int]*GPUDevice
}
Expand Down Expand Up @@ -103,7 +103,7 @@ func NewGPUDevices(name string, node *v1.Node) *GPUDevices {
klog.Infof("node %v device %s leave", node.Name, handshake)

tmppat := make(map[string]string)
tmppat[handshake] = "Deleted_" + time.Now().Format("2006.01.02 15:04:05")
tmppat[VolcanoVGPUHandshake] = "Deleted_" + time.Now().Format("2006.01.02 15:04:05")
patchNodeAnnotations(node, tmppat)
return nil
}
Expand All @@ -122,8 +122,7 @@ func (gs *GPUDevices) ScoreNode(pod *v1.Pod, schedulePolicy string) float64 {
a higher score than those needs to evict a task */

// Use cached stored in filter state in order to avoid recalculating.
klog.V(3).Infof("Scoring pod %s with to node %s with score %f", gs.Name, pod.Name, gs.ScoreMap[pod.Name])
return gs.ScoreMap[pod.Name]
return gs.Score
}

func (gs *GPUDevices) GetIgnoredDevices() []string {
Expand Down Expand Up @@ -197,9 +196,9 @@ func (gs *GPUDevices) FilterNode(pod *v1.Pod, schedulePolicy string) (int, strin
fit, _, score, err := checkNodeGPUSharingPredicateAndScore(pod, gs, true, schedulePolicy)
if err != nil || !fit {
klog.Errorln("deviceSharing err=", err.Error())
return devices.Unschedulable, fmt.Sprintf("4pdvgpuDeviceSharing %s", err.Error()), err
return devices.Unschedulable, fmt.Sprintf("hami-vgpuDeviceSharing %s", err.Error()), err
}
gs.ScoreMap[pod.Name] = score
gs.Score = score
klog.V(4).Infoln("hami-vgpu DeviceSharing successfully filters pods")
}
return devices.Success, "", nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/api/devices/nvidia/vgpu/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ const (
UnhealthyGPUIDs = "volcano.sh/gpu-unhealthy-ids"

// DeviceName used to indicate this device
DeviceName = "vgpu4pd"
DeviceName = "hamivgpu"

// binpack means the lower device memory remained after this allocation, the better
binpackPolicy = "binpack"
Expand Down
49 changes: 23 additions & 26 deletions pkg/scheduler/api/devices/nvidia/vgpu/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func init() {
var err error
kubeClient, err = NewClient()
if err != nil {
klog.Errorf("init kubeclient in 4pdvgpu failed: %s", err.Error())
klog.Errorf("init kubeclient in hamivgpu failed: %s", err.Error())
} else {
klog.V(3).Infoln("init kubeclient success")
}
Expand Down Expand Up @@ -94,27 +94,25 @@ func decodeNodeDevices(name string, str string) *GPUDevices {
}
tmp := strings.Split(str, ":")
retval := &GPUDevices{
Name: name,
Device: make(map[int]*GPUDevice),
ScoreMap: make(map[string]float64),
Name: name,
Device: make(map[int]*GPUDevice),
Score: float64(0),
}
for index, val := range tmp {
if len(val) > 0 {
if strings.Contains(val, ",") {
items := strings.Split(val, ",")
count, _ := strconv.Atoi(items[1])
devmem, _ := strconv.Atoi(items[2])
health, _ := strconv.ParseBool(items[4])
i := GPUDevice{
ID: index,
UUID: items[0],
Number: uint(count),
Memory: uint(devmem),
Type: items[3],
Health: health,
}
retval.Device[index] = &i
if strings.Contains(val, ",") {
items := strings.Split(val, ",")
count, _ := strconv.Atoi(items[1])
devmem, _ := strconv.Atoi(items[2])
health, _ := strconv.ParseBool(items[4])
i := GPUDevice{
ID: index,
UUID: items[0],
Number: uint(count),
Memory: uint(devmem),
Type: items[3],
Health: health,
}
retval.Device[index] = &i
}
}
return retval
Expand Down Expand Up @@ -308,9 +306,9 @@ func checkType(annos map[string]string, d GPUDevice, n ContainerDeviceRequest) b

func getGPUDeviceSnapShot(snap *GPUDevices) *GPUDevices {
ret := GPUDevices{
Name: snap.Name,
Device: make(map[int]*GPUDevice),
ScoreMap: make(map[string]float64),
Name: snap.Name,
Device: make(map[int]*GPUDevice),
Score: float64(0),
}
for index, val := range snap.Device {
if val != nil {
Expand All @@ -333,7 +331,6 @@ func getGPUDeviceSnapShot(snap *GPUDevices) *GPUDevices {

// checkNodeGPUSharingPredicate checks if a pod with gpu requirement can be scheduled on a node.
func checkNodeGPUSharingPredicateAndScore(pod *v1.Pod, gssnap *GPUDevices, replicate bool, schedulePolicy string) (bool, []ContainerDevices, float64, error) {

// no gpu sharing request
score := float64(0)
if !checkVGPUResourcesInPod(pod) {
Expand Down Expand Up @@ -398,10 +395,10 @@ func checkNodeGPUSharingPredicateAndScore(pod *v1.Pod, gssnap *GPUDevices, repli
Usedmem: val.Memreq,
Usedcores: val.Coresreq,
})
switch {
case schedulePolicy == binpackPolicy:
switch schedulePolicy {
case binpackPolicy:
score += binpackMultiplier * (float64(gs.Device[i].UsedMem) / float64(gs.Device[i].Memory))
case schedulePolicy == spreadPolicy:
case spreadPolicy:
if gs.Device[i].UsedNum == 1 {
score += spreadMultiplier
}
Expand Down
31 changes: 15 additions & 16 deletions pkg/scheduler/plugins/deviceshare/deviceshare.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,15 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"

"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/api/devices"
"volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/gpushare"
"volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/vgpu"
"volcano.sh/volcano/pkg/scheduler/framework"
)

var (
SchedulePolicy string = ""
scheduleWeight int = 0
)

// PluginName indicates name of volcano scheduler plugin.
const (
PluginName = "deviceshare"
Expand All @@ -55,29 +50,34 @@ const (
type deviceSharePlugin struct {
// Arguments given for the plugin
pluginArguments framework.Arguments
schedulePolicy string
scheduleWeight int
}

// New return priority plugin
func New(arguments framework.Arguments) framework.Plugin {
return &deviceSharePlugin{pluginArguments: arguments}
dsp := &deviceSharePlugin{pluginArguments: arguments, schedulePolicy: "", scheduleWeight: 0}
enablePredicate(dsp)
return dsp
}

func (dp *deviceSharePlugin) Name() string {
return PluginName
}

func enablePredicate(args framework.Arguments) {
func enablePredicate(dsp *deviceSharePlugin) {
// Checks whether predicate.GPUSharingEnable is provided or not, if given, modifies the value in predicateEnable struct.
args := dsp.pluginArguments
args.GetBool(&gpushare.GpuSharingEnable, GPUSharingPredicate)
args.GetBool(&gpushare.GpuNumberEnable, GPUNumberPredicate)
args.GetBool(&gpushare.NodeLockEnable, NodeLockEnable)
args.GetBool(&vgpu.VGPUEnable, VGPUEnable)

_, ok := args[SchedulePolicyArgument]
if ok {
SchedulePolicy = args[SchedulePolicyArgument].(string)
dsp.schedulePolicy = args[SchedulePolicyArgument].(string)
}
args.GetInt(&scheduleWeight, ScheduleWeight)
args.GetInt(&dsp.scheduleWeight, ScheduleWeight)

if gpushare.GpuSharingEnable && gpushare.GpuNumberEnable {
klog.Fatal("can not define true in both gpu sharing and gpu number")
Expand Down Expand Up @@ -108,7 +108,6 @@ func getDeviceScore(ctx context.Context, pod *v1.Pod, node *api.NodeInfo, schedu
}

func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) {
enablePredicate(dp.pluginArguments)
// Register event handlers to update task info in PodLister & nodeMap
ssn.AddPredicateFn(dp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
predicateStatus := make([]*api.Status, 0)
Expand All @@ -127,7 +126,7 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) {
klog.V(4).Infof("pod %s/%s did not request device %s on %s, skipping it", task.Pod.Namespace, task.Pod.Name, val, node.Name)
continue
}
code, msg, err := dev.FilterNode(task.Pod, SchedulePolicy)
code, msg, err := dev.FilterNode(task.Pod, dp.schedulePolicy)
if err != nil {
predicateStatus = append(predicateStatus, createStatus(code, msg))
return predicateStatus, err
Expand All @@ -150,16 +149,16 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) {

ssn.AddNodeOrderFn(dp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) {
// DeviceScore
if len(SchedulePolicy) > 0 {
score, status := getDeviceScore(context.TODO(), task.Pod, node, SchedulePolicy)
if len(dp.schedulePolicy) > 0 {
score, status := getDeviceScore(context.TODO(), task.Pod, node, dp.schedulePolicy)
if !status.IsSuccess() {
klog.Warningf("Node: %s, Calculate Device Score Failed because of Error: %v", node.Name, status.AsError())
return 0, status.AsError()
}

// TODO: we should use a seperate plugin for devices, and seperate them from predicates and nodeOrder plugin.
nodeScore := float64(score) * float64(scheduleWeight)
klog.V(5).Infof("Node: %s, task<%s/%s> Device Score weight %d, score: %f", node.Name, task.Namespace, task.Name, scheduleWeight, nodeScore)
nodeScore := float64(score) * float64(dp.scheduleWeight)
klog.V(5).Infof("Node: %s, task<%s/%s> Device Score weight %d, score: %f", node.Name, task.Namespace, task.Name, dp.scheduleWeight, nodeScore)
}
return 0, nil
})
Expand Down
130 changes: 130 additions & 0 deletions pkg/scheduler/plugins/deviceshare/deviceshare_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
Copyright 2024 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package deviceshare

import (
"testing"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/vgpu"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/util"
)

func TestArguments(t *testing.T) {
framework.RegisterPluginBuilder(PluginName, New)
defer framework.CleanupPluginBuilders()

arguments := framework.Arguments{
"deviceshare.VGPUEnable": true,
"deviceshare.SchedulePolicy": "binpack",
"deviceshare.ScheduleWeight": 10,
}

builder, ok := framework.GetPluginBuilder(PluginName)

if !ok {
t.Fatalf("should have plugin named %s", PluginName)
}

plugin := builder(arguments)
deviceshare, ok := plugin.(*deviceSharePlugin)

if !ok {
t.Fatalf("plugin should be %T, but not %T", deviceshare, plugin)
}

weight := deviceshare.scheduleWeight

if weight != 10 {
t.Errorf("weight should be 10, but not %v", weight)
}

if deviceshare.schedulePolicy != "binpack" {
t.Errorf("policy should be binpack, but not %s", deviceshare.schedulePolicy)
}
}

func addResource(resourceList v1.ResourceList, name v1.ResourceName, need string) {
resourceList[name] = resource.MustParse(need)
}

func TestVgpuScore(t *testing.T) {
gpuNode1 := vgpu.GPUDevices{
Name: "node1",
Score: float64(0),
Device: make(map[int]*vgpu.GPUDevice),
}
gpuNode1.Device[0] = vgpu.NewGPUDevice(0, 30000)
gpuNode1.Device[0].Type = "NVIDIA"
gpuNode1.Device[0].Number = 10
gpuNode1.Device[0].UsedNum = 1
gpuNode1.Device[0].UsedMem = 3000

gpunumber := v1.ResourceName("volcano.sh/vgpu-number")
gpumemory := v1.ResourceName("volcano.sh/vgpu-memory")

vgpu.VGPUEnable = true

p1 := util.BuildPod("c1", "p3", "", v1.PodPending, api.BuildResourceList("2", "10Gi"), "pg1", make(map[string]string), make(map[string]string))
addResource(p1.Spec.Containers[0].Resources.Requests, gpunumber, "1")
addResource(p1.Spec.Containers[0].Resources.Requests, gpumemory, "1000")
p1.Spec.Containers[0].Resources.Limits = make(v1.ResourceList)
addResource(p1.Spec.Containers[0].Resources.Limits, gpunumber, "1")
addResource(p1.Spec.Containers[0].Resources.Limits, gpumemory, "1000")

canAccess, _, err := gpuNode1.FilterNode(p1, "binpack")
if err != nil || canAccess != 0 {
t.Errorf("binpack filter failed %s", err.Error())
}

score := gpuNode1.ScoreNode(p1, "binpack")
if score-float64(4000*100)/float64(30000) > 0.05 {
t.Errorf("score failed expected %f, get %f", float64(4000*100)/float64(30000), score)
}

gpuNode2 := vgpu.GPUDevices{
Name: "node2",
Score: float64(0),
Device: make(map[int]*vgpu.GPUDevice),
}
gpuNode2.Device[0] = vgpu.NewGPUDevice(0, 30000)
gpuNode2.Device[0].Type = "NVIDIA"
gpuNode2.Device[0].Number = 10
gpuNode2.Device[0].UsedNum = 0
gpuNode2.Device[0].UsedMem = 0
p2 := util.BuildPod("c2", "p4", "", v1.PodPending, api.BuildResourceList("2", "10Gi"), "pg1", make(map[string]string), make(map[string]string))
addResource(p2.Spec.Containers[0].Resources.Requests, gpunumber, "1")
addResource(p2.Spec.Containers[0].Resources.Requests, gpumemory, "1000")
p2.Spec.Containers[0].Resources.Limits = make(v1.ResourceList)
addResource(p2.Spec.Containers[0].Resources.Limits, gpunumber, "1")
addResource(p2.Spec.Containers[0].Resources.Limits, gpumemory, "1000")

canAccess, _, err = gpuNode2.FilterNode(p2, "spread")
if err != nil || canAccess != 0 {
t.Errorf("binpack filter failed %s", err.Error())
}

score = gpuNode2.ScoreNode(p1, "spread")
if score-float64(100) > 0.05 {
t.Errorf("score failed expected %f, get %f", float64(4000*100)/float64(30000), score)
}

}

0 comments on commit 1218c9e

Please sign in to comment.