diff --git a/cmd/incusd/daemon.go b/cmd/incusd/daemon.go index 1494a4f874d..d7575b22ce1 100644 --- a/cmd/incusd/daemon.go +++ b/cmd/incusd/daemon.go @@ -5,6 +5,7 @@ import ( "context" "crypto/x509" "database/sql" + "encoding/json" "errors" "fmt" "io" @@ -2311,6 +2312,55 @@ func (d *Daemon) heartbeatHandler(w http.ResponseWriter, r *http.Request, isLead logger.Info("Partial heartbeat received", logger.Ctx{"local": localClusterAddress}) } + + // Refresh cluster member resource info cache. + var muRefresh sync.Mutex + + for _, member := range hbData.Members { + // Ignore offline servers. + if !member.Online { + continue + } + + if member.Name == s.ServerName { + continue + } + + go func(name string, address string) { + muRefresh.Lock() + defer muRefresh.Unlock() + + // Check if we have a recent local cache entry already. + resourcesPath := internalUtil.CachePath("resources", fmt.Sprintf("%s.yaml", name)) + fi, err := os.Stat(resourcesPath) + if err == nil && fi.ModTime().Before(time.Now().Add(time.Hour)) { + return + } + + // Connect to the server. + client, err := cluster.Connect(address, s.Endpoints.NetworkCert(), s.ServerCert(), nil, true) + if err != nil { + return + } + + // Get the server resources. + resources, err := client.GetServerResources() + if err != nil { + return + } + + // Write to cache. + data, err := json.Marshal(resources) + if err != nil { + return + } + + err = os.WriteFile(resourcesPath, data, 0600) + if err != nil { + return + } + }(member.Name, member.Address) + } } // nodeRefreshTask is run when a full state heartbeat is sent (on the leader) or received (by a non-leader member). diff --git a/internal/server/instance/drivers/driver_qemu.go b/internal/server/instance/drivers/driver_qemu.go index a1aca1ae365..c2eaf24ef1c 100644 --- a/internal/server/instance/drivers/driver_qemu.go +++ b/internal/server/instance/drivers/driver_qemu.go @@ -383,6 +383,90 @@ func (d *qemu) getAgentClient() (*http.Client, error) { return agent, nil } +func (d *qemu) getClusterCPUFlags() ([]string, error) { + // Get the list of cluster members. + var nodes []db.RaftNode + err := d.state.DB.Node.Transaction(context.TODO(), func(ctx context.Context, tx *db.NodeTx) error { + var err error + nodes, err = tx.GetRaftNodes(ctx) + return err + }) + if err != nil { + return nil, err + } + + // Get architecture name. + arch, err := osarch.ArchitectureName(d.architecture) + if err != nil { + return nil, err + } + + // Get all the CPU flags for the architecture. + flagMembers := map[string]int{} + coreCount := 0 + + for _, node := range nodes { + // Attempt to load the cached resources. + resourcesPath := internalUtil.CachePath("resources", fmt.Sprintf("%s.yaml", node.Name)) + + data, err := os.ReadFile(resourcesPath) + if err != nil { + if os.IsNotExist(err) { + continue + } + + return nil, err + } + + res := api.Resources{} + err = json.Unmarshal(data, &res) + if err != nil { + return nil, err + } + + // Skip if not the correct architecture. + if res.CPU.Architecture != arch { + continue + } + + // Add the CPU flags to the map. + for _, socket := range res.CPU.Sockets { + for _, core := range socket.Cores { + coreCount += 1 + for _, flag := range core.Flags { + flagMembers[flag] += 1 + } + } + } + } + + // Get the host flags. + info := DriverStatuses()[instancetype.VM].Info + hostFlags, ok := info.Features["flags"].(map[string]bool) + if !ok { + // No CPU flags found. + return nil, nil + } + + // Build a set of flags common to all cores. + flags := []string{} + + for k, v := range flagMembers { + if v != coreCount { + continue + } + + hostVal, ok := hostFlags[k] + if !ok || hostVal { + continue + } + + flags = append(flags, k) + } + + return flags, nil +} + func (d *qemu) getMonitorEventHandler() func(event string, data map[string]any) { // Create local variables from instance properties we need so as not to keep references to instance around // after we have returned the callback function. @@ -1413,6 +1497,19 @@ func (d *qemu) start(stateful bool, op *operationlock.InstanceOperation) error { } cpuType := "host" + + // Get CPU flags if clustered and migration is enabled. + if d.state.ServerClustered && util.IsTrue(d.expandedConfig["migration.stateful"]) { + cpuFlags, err := d.getClusterCPUFlags() + if err != nil { + op.Done(err) + return err + } + + cpuType = "kvm64" + cpuExtensions = append(cpuExtensions, cpuFlags...) + } + if len(cpuExtensions) > 0 { cpuType += "," + strings.Join(cpuExtensions, ",") } @@ -8584,6 +8681,24 @@ func (d *qemu) checkFeatures(hostArch int, qemuPath string) (map[string]any, err features["vhost_net"] = struct{}{} } + // Get the host CPU model. + model, err := monitor.QueryCPUModel("kvm64") + if err != nil { + return nil, err + } + + cpuFlags := map[string]bool{} + for k, v := range model.Flags { + value, ok := v.(bool) + if !ok { + continue + } + + cpuFlags[k] = value + } + + features["flags"] = cpuFlags + return features, nil } diff --git a/internal/server/instance/drivers/qmp/commands.go b/internal/server/instance/drivers/qmp/commands.go index 51fd03aa2cb..2ca6ef0b488 100644 --- a/internal/server/instance/drivers/qmp/commands.go +++ b/internal/server/instance/drivers/qmp/commands.go @@ -61,6 +61,12 @@ type HotpluggableCPU struct { Props CPUInstanceProperties `json:"props"` } +// CPUModel contains information about a CPU model. +type CPUModel struct { + Name string `json:"name"` + Flags map[string]any `json:"props"` +} + // QueryCPUs returns a list of CPUs. func (m *Monitor) QueryCPUs() ([]CPU, error) { // Prepare the response. @@ -91,6 +97,28 @@ func (m *Monitor) QueryHotpluggableCPUs() ([]HotpluggableCPU, error) { return resp.Return, nil } +// QueryCPUModel returns a CPUModel for the specified model name. +func (m *Monitor) QueryCPUModel(model string) (*CPUModel, error) { + // Prepare the response. + var resp struct { + Return struct { + Model CPUModel `json:"model"` + } `json:"return"` + } + + args := map[string]any{ + "model": map[string]string{"name": model}, + "type": "full", + } + + err := m.run("query-cpu-model-expansion", args, &resp) + if err != nil { + return nil, fmt.Errorf("Failed to query CPU model: %w", err) + } + + return &resp.Return.Model, nil +} + // Status returns the current VM status. func (m *Monitor) Status() (string, error) { // Prepare the response. diff --git a/internal/server/sys/fs.go b/internal/server/sys/fs.go index ccf31852c65..a179828d0b8 100644 --- a/internal/server/sys/fs.go +++ b/internal/server/sys/fs.go @@ -42,6 +42,7 @@ func (s *OS) initDirs() error { {filepath.Join(s.VarDir, "backups"), 0700}, {s.CacheDir, 0700}, + {filepath.Join(s.CacheDir, "resources"), 0700}, {filepath.Join(s.VarDir, "database"), 0700}, {filepath.Join(s.VarDir, "devices"), 0711}, {filepath.Join(s.VarDir, "disks"), 0700},