Skip to content

Commit

Permalink
core: plumbing to support numa aware scheduling (hashicorp#18681)
Browse files Browse the repository at this point in the history
* core: plumbing to support numa aware scheduling

* core: apply node resources compatibility upon fsm rstore

Handle the case where an upgraded server dequeus an evaluation before
a client triggers a new fingerprint - which would be needed to cause
the compatibility fix to run. By running the compat fix on restore the
server will immediately have the compatible pseudo topology to use.

* lint: learn how to spell pseudo
  • Loading branch information
shoenig authored and nvanthao committed Mar 1, 2024
1 parent 9517a00 commit ac4ab7a
Show file tree
Hide file tree
Showing 56 changed files with 1,299 additions and 1,487 deletions.
32 changes: 32 additions & 0 deletions api/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Resources struct {
DiskMB *int `mapstructure:"disk" hcl:"disk,optional"`
Networks []*NetworkResource `hcl:"network,block"`
Devices []*RequestedDevice `hcl:"device,block"`
NUMA *NUMAResource `hcl:"numa,block"`

// COMPAT(0.10)
// XXX Deprecated. Please do not use. The field will be removed in Nomad
Expand Down Expand Up @@ -50,6 +51,8 @@ func (r *Resources) Canonicalize() {
for _, d := range r.Devices {
d.Canonicalize()
}

r.NUMA.Canonicalize()
}

// DefaultResources is a small resources object that contains the
Expand Down Expand Up @@ -97,6 +100,35 @@ func (r *Resources) Merge(other *Resources) {
if len(other.Devices) != 0 {
r.Devices = other.Devices
}
if other.NUMA != nil {
r.NUMA = other.NUMA.Copy()
}
}

// NUMAResource contains the NUMA affinity request for scheduling purposes.
//
// Applies only to Nomad Enterprise.
type NUMAResource struct {
// Affinity must be one of "none", "prefer", "require".
Affinity string `hcl:"affinity,optional"`
}

func (n *NUMAResource) Copy() *NUMAResource {
if n == nil {
return nil
}
return &NUMAResource{
Affinity: n.Affinity,
}
}

func (n *NUMAResource) Canonicalize() {
if n == nil {
return
}
if n.Affinity == "" {
n.Affinity = "none"
}
}

type Port struct {
Expand Down
54 changes: 54 additions & 0 deletions api/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,57 @@ func TestResources_Canonicalize(t *testing.T) {
})
}
}

func TestResources_Merge(t *testing.T) {
testutil.Parallel(t)

none := &NUMAResource{Affinity: "none"}
prefer := &NUMAResource{Affinity: "prefer"}

cases := []struct {
name string
resource *Resources
other *Resources
exp *Resources
}{
{
name: "merge nil numa",
resource: &Resources{NUMA: none},
other: &Resources{NUMA: nil},
exp: &Resources{NUMA: none},
},
{
name: "merge non-nil numa",
resource: &Resources{NUMA: none},
other: &Resources{NUMA: prefer},
exp: &Resources{NUMA: prefer},
},
}

for _, tc := range cases {
tc.resource.Merge(tc.other)
must.Eq(t, tc.exp, tc.resource)
}
}

func TestNUMAResource_Copy(t *testing.T) {
testutil.Parallel(t)

r1 := &NUMAResource{Affinity: "none"}
r2 := r1.Copy()
r1.Affinity = "require"
must.Eq(t, "require", r1.Affinity)
must.Eq(t, "none", r2.Affinity)
}

func TestNUMAResource_Canonicalize(t *testing.T) {
testutil.Parallel(t)

var n1 *NUMAResource
n1.Canonicalize()
must.Nil(t, n1)

var n2 = &NUMAResource{Affinity: ""}
n2.Canonicalize()
must.Eq(t, &NUMAResource{Affinity: "none"}, n2)
}
2 changes: 1 addition & 1 deletion client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1163,7 +1163,7 @@ func (tr *TaskRunner) buildTaskConfig() *drivers.TaskConfig {
MemoryLimitBytes: memoryLimit * 1024 * 1024,
CPUShares: taskResources.Cpu.CpuShares,
CpusetCpus: strings.Join(cpusetCpus, ","),
PercentTicks: float64(taskResources.Cpu.CpuShares) / float64(tr.clientConfig.Node.NodeResources.Cpu.CpuShares),
PercentTicks: float64(taskResources.Cpu.CpuShares) / float64(tr.clientConfig.Node.NodeResources.Processors.Topology.UsableCompute()),
},
Ports: &ports,
},
Expand Down
22 changes: 4 additions & 18 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1506,7 +1506,7 @@ func (c *Client) setupNode() error {
node.NodeResources = &structs.NodeResources{}
node.NodeResources.MinDynamicPort = newConfig.MinDynamicPort
node.NodeResources.MaxDynamicPort = newConfig.MaxDynamicPort
node.NodeResources.Cpu = newConfig.Node.NodeResources.Cpu
node.NodeResources.Processors = newConfig.Node.NodeResources.Processors
}
if node.ReservedResources == nil {
node.ReservedResources = &structs.NodeReservedResources{}
Expand Down Expand Up @@ -1643,19 +1643,6 @@ func (c *Client) updateNodeFromFingerprint(response *fingerprint.FingerprintResp
}
}

// COMPAT(0.10): Remove in 0.10
// update the response networks with the config
// if we still have node changes, merge them
if response.Resources != nil {
response.Resources.Networks = updateNetworks(
response.Resources.Networks,
newConfig)
if !newConfig.Node.Resources.Equal(response.Resources) {
newConfig.Node.Resources.Merge(response.Resources)
nodeHasChanged = true
}
}

// update the response networks with the config
// if we still have node changes, merge them
if response.NodeResources != nil {
Expand All @@ -1673,7 +1660,7 @@ func (c *Client) updateNodeFromFingerprint(response *fingerprint.FingerprintResp
}

// update config with total cpu compute if it was detected
if cpu := int(response.NodeResources.Cpu.CpuShares); cpu > 0 {
if cpu := response.NodeResources.Processors.TotalCompute(); cpu > 0 {
newConfig.CpuCompute = cpu
}
}
Expand Down Expand Up @@ -3250,7 +3237,7 @@ func (c *Client) setGaugeForAllocationStats(nodeID string, baseLabels []metrics.
// Emit unallocated
unallocatedMem := total.Memory.MemoryMB - res.Memory.MemoryMB - allocated.Flattened.Memory.MemoryMB
unallocatedDisk := total.Disk.DiskMB - res.Disk.DiskMB - allocated.Shared.DiskMB
unallocatedCpu := total.Cpu.CpuShares - res.Cpu.CpuShares - allocated.Flattened.Cpu.CpuShares
unallocatedCpu := int64(total.Processors.Topology.UsableCompute()) - res.Cpu.CpuShares - allocated.Flattened.Cpu.CpuShares

metrics.SetGaugeWithLabels([]string{"client", "unallocated", "memory"}, float32(unallocatedMem), baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "unallocated", "disk"}, float32(unallocatedDisk), baseLabels)
Expand Down Expand Up @@ -3357,8 +3344,7 @@ func (c *Client) getAllocatedResources(selfNode *structs.Node) *structs.Comparab
}

// Add the resources
// COMPAT(0.11): Just use the allocated resources
allocated.Add(alloc.ComparableResources())
allocated.Add(alloc.AllocatedResources.Comparable())

// Add the used network
if alloc.AllocatedResources != nil {
Expand Down
74 changes: 34 additions & 40 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@ import (
"time"

memdb "github.com/hashicorp/go-memdb"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/allocrunner"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
Expand All @@ -28,6 +22,7 @@ import (
"github.com/hashicorp/nomad/client/fingerprint"
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
cstate "github.com/hashicorp/nomad/client/state"
ctestutil "github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/pluginutils/catalog"
"github.com/hashicorp/nomad/helper/pluginutils/singleton"
Expand All @@ -40,6 +35,11 @@ import (
"github.com/hashicorp/nomad/plugins/device"
psstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func testACLServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string, *structs.ACLToken, func()) {
Expand Down Expand Up @@ -1358,8 +1358,11 @@ func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {
defer cleanup()

client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{
// overrides the detected hardware in TestClient
NodeResources: &structs.NodeResources{
Cpu: structs.NodeCpuResources{CpuShares: 123},
Processors: structs.NodeProcessorResources{
Topology: structs.MockBasicTopology(),
},
},
})

Expand All @@ -1385,8 +1388,11 @@ func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {
Disk: conf.Node.NodeResources.Disk,

// injected
Cpu: structs.NodeCpuResources{
CpuShares: 123,
Processors: structs.NodeProcessorResources{
Topology: structs.MockBasicTopology(),
},
Cpu: structs.LegacyNodeCpuResources{
CpuShares: 14_000, // mock has 4 cores * 3500 MHz
ReservableCpuCores: conf.Node.NodeResources.Cpu.ReservableCpuCores,
TotalCpuCores: conf.Node.NodeResources.Cpu.TotalCpuCores,
},
Expand All @@ -1399,7 +1405,7 @@ func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {
},
}

assert.EqualValues(t, expectedResources, conf.Node.NodeResources)
must.Eq(t, expectedResources, conf.Node.NodeResources)

// overrides of values

Expand Down Expand Up @@ -1429,8 +1435,11 @@ func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {
Disk: conf.Node.NodeResources.Disk,

// injected
Cpu: structs.NodeCpuResources{
CpuShares: 123,
Processors: structs.NodeProcessorResources{
Topology: structs.MockBasicTopology(),
},
Cpu: structs.LegacyNodeCpuResources{
CpuShares: 14_000, // mock has 4 cores * 3500 MHz
ReservableCpuCores: conf.Node.NodeResources.Cpu.ReservableCpuCores,
TotalCpuCores: conf.Node.NodeResources.Cpu.TotalCpuCores,
},
Expand All @@ -1454,35 +1463,27 @@ func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {
// TestClient_UpdateNodeFromFingerprintKeepsConfig asserts manually configured
// network interfaces take precedence over fingerprinted ones.
func TestClient_UpdateNodeFromFingerprintKeepsConfig(t *testing.T) {
ctestutil.RequireLinux(t)
ci.Parallel(t)
if runtime.GOOS != "linux" {
t.Skip("assertions assume linux platform")
}

// Client without network configured updates to match fingerprint
client, cleanup := TestClient(t, nil)
defer cleanup()

client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{
NodeResources: &structs.NodeResources{
Cpu: structs.NodeCpuResources{CpuShares: 123},
Networks: []*structs.NetworkResource{{Mode: "host", Device: "any-interface"}},
},
Resources: &structs.Resources{
CPU: 80,
},
})
idx := len(client.config.Node.NodeResources.Networks) - 1
require.Equal(t, int64(123), client.config.Node.NodeResources.Cpu.CpuShares)
require.Equal(t, "any-interface", client.config.Node.NodeResources.Networks[idx].Device)
require.Equal(t, 80, client.config.Node.Resources.CPU)
must.Eq(t, "any-interface", client.config.Node.NodeResources.Networks[idx].Device)

// lookup an interface. client.Node starts with a hardcoded value, eth0,
// and is only updated async through fingerprinter.
// Let's just lookup network device; anyone will do for this test
interfaces, err := net.Interfaces()
require.NoError(t, err)
require.NotEmpty(t, interfaces)
must.NoError(t, err)
must.NotNil(t, interfaces)
dev := interfaces[0].Name

// Client with network interface configured keeps the config
Expand All @@ -1498,36 +1499,31 @@ func TestClient_UpdateNodeFromFingerprintKeepsConfig(t *testing.T) {
defer cleanup()
client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{
NodeResources: &structs.NodeResources{
Cpu: structs.NodeCpuResources{CpuShares: 123},
Networks: []*structs.NetworkResource{
{Mode: "host", Device: "any-interface", MBits: 20},
},
},
})
require.Equal(t, int64(123), client.config.Node.NodeResources.Cpu.CpuShares)

// only the configured device is kept
require.Equal(t, 2, len(client.config.Node.NodeResources.Networks))
require.Equal(t, dev, client.config.Node.NodeResources.Networks[0].Device)
require.Equal(t, "bridge", client.config.Node.NodeResources.Networks[1].Mode)
must.Eq(t, 2, len(client.config.Node.NodeResources.Networks))
must.Eq(t, dev, client.config.Node.NodeResources.Networks[0].Device)
must.Eq(t, "bridge", client.config.Node.NodeResources.Networks[1].Mode)

// Network speed is applied to all NetworkResources
client.config.NetworkInterface = ""
client.config.NetworkSpeed = 100
client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{
NodeResources: &structs.NodeResources{
Cpu: structs.NodeCpuResources{CpuShares: 123},
Networks: []*structs.NetworkResource{
{Mode: "host", Device: "any-interface", MBits: 20},
},
},
Resources: &structs.Resources{
CPU: 80,
},
})
assert.Equal(t, 3, len(client.config.Node.NodeResources.Networks))
assert.Equal(t, "any-interface", client.config.Node.NodeResources.Networks[2].Device)
assert.Equal(t, 100, client.config.Node.NodeResources.Networks[2].MBits)
assert.Equal(t, 0, client.config.Node.NodeResources.Networks[1].MBits)
must.Eq(t, 3, len(client.config.Node.NodeResources.Networks))
must.Eq(t, "any-interface", client.config.Node.NodeResources.Networks[2].Device)
must.Eq(t, 100, client.config.Node.NodeResources.Networks[2].MBits)
must.Eq(t, 0, client.config.Node.NodeResources.Networks[1].MBits)
}

// Support multiple IP addresses (ipv4 vs. 6, e.g.) on the configured network interface
Expand All @@ -1546,13 +1542,11 @@ func Test_UpdateNodeFromFingerprintMultiIP(t *testing.T) {
client, cleanup := TestClient(t, func(c *config.Config) {
c.NetworkInterface = dev
c.Options["fingerprint.denylist"] = "network,cni,bridge"
c.Node.Resources.Networks = c.Node.NodeResources.Networks
})
defer cleanup()

client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{
NodeResources: &structs.NodeResources{
Cpu: structs.NodeCpuResources{CpuShares: 123},
Networks: []*structs.NetworkResource{
{Device: dev, IP: "127.0.0.1"},
{Device: dev, IP: "::1"},
Expand All @@ -1566,7 +1560,7 @@ func Test_UpdateNodeFromFingerprintMultiIP(t *testing.T) {
{Device: dev, IP: "::1"},
}

require.Equal(t, nets, client.config.Node.NodeResources.Networks)
must.Eq(t, nets, client.config.Node.NodeResources.Networks)
}

func TestClient_computeAllocatedDeviceStats(t *testing.T) {
Expand Down
Loading

0 comments on commit ac4ab7a

Please sign in to comment.