Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compute a cluster-wide baseline CPU definition for VMs #981

Merged
merged 4 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions cmd/incusd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"crypto/x509"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -2311,6 +2312,55 @@ func (d *Daemon) heartbeatHandler(w http.ResponseWriter, r *http.Request, isLead

logger.Info("Partial heartbeat received", logger.Ctx{"local": localClusterAddress})
}

// Refresh cluster member resource info cache.
var muRefresh sync.Mutex

for _, member := range hbData.Members {
// Ignore offline servers.
if !member.Online {
continue
}

if member.Name == s.ServerName {
continue
}

go func(name string, address string) {
muRefresh.Lock()
defer muRefresh.Unlock()

// Check if we have a recent local cache entry already.
resourcesPath := internalUtil.CachePath("resources", fmt.Sprintf("%s.yaml", name))
fi, err := os.Stat(resourcesPath)
if err == nil && fi.ModTime().Before(time.Now().Add(time.Hour)) {
return
}

// Connect to the server.
client, err := cluster.Connect(address, s.Endpoints.NetworkCert(), s.ServerCert(), nil, true)
if err != nil {
return
}

// Get the server resources.
resources, err := client.GetServerResources()
if err != nil {
return
}

// Write to cache.
data, err := json.Marshal(resources)
if err != nil {
return
}

err = os.WriteFile(resourcesPath, data, 0600)
if err != nil {
return
}
}(member.Name, member.Address)
}
}

// nodeRefreshTask is run when a full state heartbeat is sent (on the leader) or received (by a non-leader member).
Expand Down
115 changes: 115 additions & 0 deletions internal/server/instance/drivers/driver_qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,90 @@ func (d *qemu) getAgentClient() (*http.Client, error) {
return agent, nil
}

func (d *qemu) getClusterCPUFlags() ([]string, error) {
// Get the list of cluster members.
var nodes []db.RaftNode
err := d.state.DB.Node.Transaction(context.TODO(), func(ctx context.Context, tx *db.NodeTx) error {
var err error
nodes, err = tx.GetRaftNodes(ctx)
return err
})
if err != nil {
return nil, err
}

// Get architecture name.
arch, err := osarch.ArchitectureName(d.architecture)
if err != nil {
return nil, err
}

// Get all the CPU flags for the architecture.
flagMembers := map[string]int{}
coreCount := 0

for _, node := range nodes {
// Attempt to load the cached resources.
resourcesPath := internalUtil.CachePath("resources", fmt.Sprintf("%s.yaml", node.Name))

data, err := os.ReadFile(resourcesPath)
if err != nil {
if os.IsNotExist(err) {
continue
}

return nil, err
}

res := api.Resources{}
err = json.Unmarshal(data, &res)
if err != nil {
return nil, err
}

// Skip if not the correct architecture.
if res.CPU.Architecture != arch {
continue
}

// Add the CPU flags to the map.
for _, socket := range res.CPU.Sockets {
for _, core := range socket.Cores {
coreCount += 1
for _, flag := range core.Flags {
flagMembers[flag] += 1
}
}
}
}

// Get the host flags.
info := DriverStatuses()[instancetype.VM].Info
hostFlags, ok := info.Features["flags"].(map[string]bool)
if !ok {
// No CPU flags found.
return nil, nil
}

// Build a set of flags common to all cores.
flags := []string{}

for k, v := range flagMembers {
if v != coreCount {
continue
}

hostVal, ok := hostFlags[k]
if !ok || hostVal {
continue
}

flags = append(flags, k)
}

return flags, nil
}

func (d *qemu) getMonitorEventHandler() func(event string, data map[string]any) {
// Create local variables from instance properties we need so as not to keep references to instance around
// after we have returned the callback function.
Expand Down Expand Up @@ -1413,6 +1497,19 @@ func (d *qemu) start(stateful bool, op *operationlock.InstanceOperation) error {
}

cpuType := "host"

// Get CPU flags if clustered and migration is enabled.
if d.state.ServerClustered && util.IsTrue(d.expandedConfig["migration.stateful"]) {
cpuFlags, err := d.getClusterCPUFlags()
if err != nil {
op.Done(err)
return err
}

cpuType = "kvm64"
cpuExtensions = append(cpuExtensions, cpuFlags...)
}

if len(cpuExtensions) > 0 {
cpuType += "," + strings.Join(cpuExtensions, ",")
}
Expand Down Expand Up @@ -8584,6 +8681,24 @@ func (d *qemu) checkFeatures(hostArch int, qemuPath string) (map[string]any, err
features["vhost_net"] = struct{}{}
}

// Get the host CPU model.
model, err := monitor.QueryCPUModel("kvm64")
if err != nil {
return nil, err
}

cpuFlags := map[string]bool{}
for k, v := range model.Flags {
value, ok := v.(bool)
if !ok {
continue
}

cpuFlags[k] = value
}

features["flags"] = cpuFlags

return features, nil
}

Expand Down
28 changes: 28 additions & 0 deletions internal/server/instance/drivers/qmp/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ type HotpluggableCPU struct {
Props CPUInstanceProperties `json:"props"`
}

// CPUModel contains information about a CPU model.
type CPUModel struct {
Name string `json:"name"`
Flags map[string]any `json:"props"`
}

// QueryCPUs returns a list of CPUs.
func (m *Monitor) QueryCPUs() ([]CPU, error) {
// Prepare the response.
Expand Down Expand Up @@ -91,6 +97,28 @@ func (m *Monitor) QueryHotpluggableCPUs() ([]HotpluggableCPU, error) {
return resp.Return, nil
}

// QueryCPUModel returns a CPUModel for the specified model name.
func (m *Monitor) QueryCPUModel(model string) (*CPUModel, error) {
// Prepare the response.
var resp struct {
Return struct {
Model CPUModel `json:"model"`
} `json:"return"`
}

args := map[string]any{
"model": map[string]string{"name": model},
"type": "full",
}

err := m.run("query-cpu-model-expansion", args, &resp)
if err != nil {
return nil, fmt.Errorf("Failed to query CPU model: %w", err)
}

return &resp.Return.Model, nil
}

// Status returns the current VM status.
func (m *Monitor) Status() (string, error) {
// Prepare the response.
Expand Down
1 change: 1 addition & 0 deletions internal/server/sys/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (s *OS) initDirs() error {

{filepath.Join(s.VarDir, "backups"), 0700},
{s.CacheDir, 0700},
{filepath.Join(s.CacheDir, "resources"), 0700},
{filepath.Join(s.VarDir, "database"), 0700},
{filepath.Join(s.VarDir, "devices"), 0711},
{filepath.Join(s.VarDir, "disks"), 0700},
Expand Down
Loading