diff --git a/cmd/incusd/api_cluster.go b/cmd/incusd/api_cluster.go index 465acb3a3e2..2fa2af2d2e3 100644 --- a/cmd/incusd/api_cluster.go +++ b/cmd/incusd/api_cluster.go @@ -6,6 +6,7 @@ import ( "encoding/pem" "errors" "fmt" + "math" "net/http" "net/url" "os" @@ -52,6 +53,7 @@ import ( "github.com/lxc/incus/v6/shared/logger" "github.com/lxc/incus/v6/shared/osarch" localtls "github.com/lxc/incus/v6/shared/tls" + "github.com/lxc/incus/v6/shared/units" "github.com/lxc/incus/v6/shared/util" "github.com/lxc/incus/v6/shared/validate" ) @@ -4573,3 +4575,260 @@ func autoHealCluster(ctx context.Context, s *state.State, offlineMembers []db.No return nil } + +func autoClusterRebalanceTask(d *Daemon) (task.Func, task.Schedule) { + s := d.State() + f := func(ctx context.Context) { + s := d.State() + rebalanceThreshold := s.GlobalConfig.ClusterRebalanceThreshold() + if rebalanceThreshold == 0 { + return // Skip rebalancing if it's disabled + } + + leader, err := d.gateway.LeaderAddress() + if err != nil { + if errors.Is(err, cluster.ErrNodeIsNotClustered) { + return // Skip rebalancing if not clustered. + } + + logger.Error("Failed to get leader cluster member address", logger.Ctx{"err": err}) + return + } + + if s.LocalConfig.ClusterAddress() != leader { + return // Skip rebalancing if not cluster leader. + } + + // get all online members + var onlineMembers []db.NodeInfo + err = s.DB.Cluster.Transaction(ctx, func(ctx context.Context, tx *db.ClusterTx) error { + members, err := tx.GetNodes(ctx) + if err != nil { + return fmt.Errorf("Failed getting cluster members: %w", err) + } + + onlineMembers, err = tx.GetCandidateMembers(ctx, members, nil, "", nil, s.GlobalConfig.OfflineThreshold()) + if err != nil { + return fmt.Errorf("Failed getting online cluster members: %w", err) + } + + return nil + }) + if err != nil { + logger.Error("Failed getting cluster members", logger.Ctx{"err": err}) + return + } + + // map each architecture to the member that have it + architectureMap := make(map[string][]db.NodeInfo) + + // maps the nodeinfo ID to the resources + resourcesMap := make(map[int64]*api.Resources) + + // maps the nodeinfo ID to the member state + memberStateMap := make(map[int64]*api.ClusterMemberState) + + for _, member := range onlineMembers { + clusterMember, err := cluster.Connect(member.Address, s.Endpoints.NetworkCert(), s.ServerCert(), nil, true) + if err != nil { + logger.Error("Failed to connect to cluster member", logger.Ctx{"err": err}) + return + } + + resources, err := clusterMember.GetServerResources() + if err != nil { + logger.Error("Failed to get resources for cluster member", logger.Ctx{"err": err}) + return + } + + resourcesMap[member.ID] = resources + + memberState, _, err := clusterMember.GetClusterMemberState(member.Name) + if err != nil { + logger.Error("Failed to get cluster member state", logger.Ctx{"err": err}) + return + } + + memberStateMap[member.ID] = memberState + + architecture := resources.CPU.Architecture + + // add the architecture to the map + architectureMap[architecture] = append(architectureMap[architecture], member) + } + + // migrate instances from max to min server per architecture + for architecture, servers := range architectureMap { + // check if there are no servers + if len(servers) < 2 { + continue + } + + maxScore := -1.0 + minScore := math.MaxFloat64 + + maxServer := servers[0] + minServer := servers[0] + + // get the resources for each server + for _, server := range servers { + memberState := memberStateMap[server.ID] + + // percent memory usage + memoryUsage := float64(memberState.SysInfo.TotalRAM-memberState.SysInfo.FreeRAM) / float64(memberState.SysInfo.TotalRAM) + + score := memoryUsage + + // update max / min score + if score > maxScore { + maxScore = score + maxServer = server + } + + if score < minScore { + minScore = score + minServer = server + } + } + + if maxScore-minScore >= float64(rebalanceThreshold) { + logger.Info("Rebalancing for Architecture", logger.Ctx{"architecture": architecture}) + + // get all instances from our maxscore server + var dbInstances []dbCluster.Instance + err = s.DB.Cluster.Transaction(ctx, func(ctx context.Context, tx *db.ClusterTx) error { + dbInstances, err = dbCluster.GetInstances(ctx, tx.Tx(), dbCluster.InstanceFilter{Node: &maxServer.Name}) + if err != nil { + return fmt.Errorf("Failed to get instances: %w", err) + } + + return nil + }) + if err != nil { + logger.Error("Failed to get instances", logger.Ctx{"err": err}) + return + } + + // filter for instances that can be live migrated + var instances []instance.Instance + for _, dbInst := range dbInstances { + inst, err := instance.LoadByProjectAndName(s, dbInst.Project, dbInst.Name) + if err != nil { + logger.Error("Failed to load instance", logger.Ctx{"err": err}) + return + } + + // check if the instance can be live migrated, instance name not in map, or cooldown has passed + + live := inst.CanMigrate() == "live-migrate" + + // check if the name is in the map + _, exists := d.instanceMigrationCooldowns[inst.Name()] + if live && exists { + // check cooldown + if time.Now().Before(d.instanceMigrationCooldowns[inst.Name()]) { + live = false + } + } + + if live { + instances = append(instances, inst) + } + } + + // set a target score to determine how much we should move + targetScore := (maxScore + minScore) / 2 + + // get the difference from the min server score + deltaScore := targetScore - minScore + + maxServerMemberState := memberStateMap[maxServer.ID] + minServerMemberState := memberStateMap[minServer.ID] + + // translate deltascore to memory per server + maxServerMemoryToMigrate := deltaScore * float64(maxServerMemberState.SysInfo.TotalRAM) + minServerMemoryToMigrate := deltaScore * float64(minServerMemberState.SysInfo.TotalRAM) + + // migrate instances to reach the minimum between them + memoryToMigrate := math.Min(maxServerMemoryToMigrate, minServerMemoryToMigrate) + + // limits how many instances can be migrated in 1 sitting + batchLimit := s.GlobalConfig.ClusterRebalanceBatch() + + var instancesToMigrate []instance.Instance + + // select instances to migrate until we reach the desired memory + for _, inst := range instances { + memoryLimits, err := units.ParseByteSizeString(inst.ExpandedConfig()["limits.memory"]) + if err != nil { + logger.Error("Failed to parse memory limits", logger.Ctx{"err": err}) + return + } + + // append instance to instancesToMigrate + instancesToMigrate = append(instancesToMigrate, inst) + memoryToMigrate -= float64(memoryLimits) + batchLimit -= 1 + + // check if we have reached the memory limit + if memoryToMigrate <= 0 || batchLimit == 0{ + break + } + } + + logger.Info("Number of instances to migrate", logger.Ctx{"count": len(instancesToMigrate)}) + + opRun := func(op *operations.Operation) error { + err := autoClusterRebalance(ctx, d, s, instancesToMigrate, &maxServer, &minServer, op) + if err != nil { + logger.Error("Failed rebalancing cluster instances", logger.Ctx{"err": err}) + return err + } + + return nil + } + + op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ClusterRebalance, nil, nil, opRun, nil, nil, nil) + if err != nil { + logger.Error("Failed creating auto cluster rebalancing operation", logger.Ctx{"err": err}) + return + } + + err = op.Start() + if err != nil { + logger.Error("Failed starting auto cluster rebalancing operation", logger.Ctx{"err": err}) + return + } + + err = op.Wait(ctx) + if err != nil { + logger.Error("Failed auto cluster rebalancing", logger.Ctx{"err": err}) + return + } + } + } + } + return f, task.Every(time.Duration(s.GlobalConfig.ClusterRebalanceFrequency()) * time.Minute) +} + +func autoClusterRebalance(ctx context.Context, d *Daemon, s *state.State, instancesToMigrate []instance.Instance, maxServer *db.NodeInfo, minServer *db.NodeInfo, op *operations.Operation) error { + for _, inst := range instancesToMigrate { + req := api.InstancePost{ + Migration: true, + Live: true, + } + + err := migrateInstance(ctx, s, inst, req, maxServer, minServer, op) + if err != nil { + return fmt.Errorf("Failed to migrate instance %q in project %q: %w", inst.Name(), inst.Project().Name, err) + } + + // update the cooldown + cooldown, err := time.ParseDuration(s.GlobalConfig.ClusterRebalanceCooldown()) + if err != nil { + return fmt.Errorf("Failed to parse cooldown duration: %w", err) + } + d.instanceMigrationCooldowns[inst.Name()] = time.Now().Add(time.Duration(cooldown)) + } + return nil +} diff --git a/cmd/incusd/daemon.go b/cmd/incusd/daemon.go index 13ae28a593d..991f4f6643f 100644 --- a/cmd/incusd/daemon.go +++ b/cmd/incusd/daemon.go @@ -167,6 +167,9 @@ type Daemon struct { // OVN clients. ovnnb *ovn.NB ovnsb *ovn.SB + + // Cooldowns map + instanceMigrationCooldowns map[string]time.Time } // DaemonConfig holds configuration values for Daemon. @@ -196,6 +199,7 @@ func newDaemon(config *DaemonConfig, os *sys.OS) *Daemon { shutdownCtx: shutdownCtx, shutdownCancel: shutdownCancel, shutdownDoneCh: make(chan error), + instanceMigrationCooldowns: make(map[string]time.Time), } d.serverCert = func() *localtls.CertInfo { return d.serverCertInt } @@ -1646,6 +1650,9 @@ func (d *Daemon) startClusterTasks() { // Heartbeats d.taskClusterHeartbeat = d.clusterTasks.Add(cluster.HeartbeatTask(d.gateway)) + // Auto-rebalance instances across cluster + d.clusterTasks.Add(autoClusterRebalanceTask(d)) + // Auto-sync images across the cluster (hourly) d.clusterTasks.Add(autoSyncImagesTask(d)) diff --git a/doc/api-extensions.md b/doc/api-extensions.md index 043f4f4e328..d9793509211 100644 --- a/doc/api-extensions.md +++ b/doc/api-extensions.md @@ -2460,3 +2460,7 @@ containers to another system. ## `profiles_all_projects` This adds support for listing profiles across all projects through the `all-projects` parameter on the `GET /1.0/profiles`API. + +## `cluster_rebalance` + +This adds the ability to automatically livemigrate instances across a cluster to automatically balance the load. \ No newline at end of file diff --git a/doc/config_options.txt b/doc/config_options.txt index a8d6a6ecc7f..c241a7bc4ed 100644 --- a/doc/config_options.txt +++ b/doc/config_options.txt @@ -1509,6 +1509,38 @@ This must be an odd number >= `3`. Specify the number of seconds after which an unresponsive member is considered offline. ``` +```{config:option} cluster.rebalance.batch server-cluster +:defaultdesc: "`5`" +:scope: "global" +:shortdesc: "Number of database voter members" +:type: "stromg" +Maximum number of instances to move during one re-balancing run TODO: adjust the default to not be fixed +``` + +```{config:option} cluster.rebalance.cooldown server-cluster +:defaultdesc: "`1H`" +:scope: "global" +:shortdesc: "Number of database voter members" +:type: "string" +Amount of time during which an instance will not be moved again +``` + +```{config:option} cluster.rebalance.frequency server-cluster +:defaultdesc: "`3`" +:scope: "global" +:shortdesc: "Number of database voter members" +:type: "integer" +This is how often we want to rebalance things TODO: make this more descriptive, update the validator +``` + +```{config:option} cluster.rebalance.threshold server-cluster +:defaultdesc: "`3`" +:scope: "global" +:shortdesc: "Number of database voter members" +:type: "integer" +Load difference beteween most and least busy server needed to trigger a migration TODO: add a validator +``` + ```{config:option} core.bgp_address server-core diff --git a/incus b/incus new file mode 100755 index 00000000000..04ce3df3048 Binary files /dev/null and b/incus differ diff --git a/internal/server/cluster/config/config.go b/internal/server/cluster/config/config.go index 151b98a0b59..8b340e2800d 100644 --- a/internal/server/cluster/config/config.go +++ b/internal/server/cluster/config/config.go @@ -54,6 +54,26 @@ func (c *Config) BGPASN() int64 { return c.m.GetInt64("core.bgp_asn") } +// ClusterRebalanceThreshold returns the relevant setting. +func (c *Config) ClusterRebalanceThreshold() int64 { + return c.m.GetInt64("cluster.rebalance.threshold") +} + +// ClusterRebalanceFrequency returns the relevant setting. +func (c *Config) ClusterRebalanceFrequency() int64 { + return c.m.GetInt64("cluster.rebalance.frequency") +} + +// ClusterRebalanceBatch returns the relevant setting. +func (c *Config) ClusterRebalanceBatch() int64 { + return c.m.GetInt64("cluster.rebalance.batch") +} + +// ClusterRebalanceCooldown returns the relevant setting. +func (c *Config) ClusterRebalanceCooldown() string { + return c.m.GetString("cluster.rebalance.cooldown") +} + // HTTPSAllowedHeaders returns the relevant CORS setting. func (c *Config) HTTPSAllowedHeaders() string { return c.m.GetString("core.https_allowed_headers") @@ -375,6 +395,42 @@ var ConfigSchema = config.Schema{ // shortdesc: Number of database voter members "cluster.max_voters": {Type: config.Int64, Default: "3", Validator: maxVotersValidator}, + // gendoc:generate(entity=server, group=cluster, key=cluster.rebalance.frequency) + // This is how often we want to check if we want to rebalance our instances (in minutes) + // --- + // type: integer + // scope: global + // defaultdesc: `3` + // shortdesc: Frequency of checking rebalance threshold in minutes + "cluster.rebalance.frequency": {Type: config.Int64, Default: "3"}, + + // gendoc:generate(entity=server, group=cluster, key=cluster.rebalance.threshold) + // Load difference beteween most and least busy server needed to trigger a migration + // --- + // type: integer + // scope: global + // defaultdesc: `3` + // shortdesc: Minimum difference in load needed to trigger a migration + "cluster.rebalance.threshold": {Type: config.Int64, Default: "20"}, + + // gendoc:generate(entity=server, group=cluster, key=cluster.rebalance.cooldown) + // Amount of time during which an instance will not be moved again + // --- + // type: string + // scope: global + // defaultdesc: `1H` + // shortdesc: Amount of time per instance to wait before moving it again + "cluster.rebalance.cooldown": {Type: config.String, Default: "1H"}, + + // gendoc:generate(entity=server, group=cluster, key=cluster.rebalance.batch) + // Maximum number of instances to move during one re-balancing run + // --- + // type: stromg + // scope: global + // defaultdesc: `5` + // shortdesc: Max number of instances to move at once + "cluster.rebalance.batch": {Type: config.Int64, Default: "5"}, + // gendoc:generate(entity=server, group=cluster, key=cluster.max_standby) // Specify the maximum number of cluster members that are assigned the database stand-by role. // This must be a number between `0` and `5`. diff --git a/internal/server/db/operationtype/operation_type.go b/internal/server/db/operationtype/operation_type.go index 0144d98df40..d77e5cdcfa4 100644 --- a/internal/server/db/operationtype/operation_type.go +++ b/internal/server/db/operationtype/operation_type.go @@ -79,6 +79,7 @@ const ( BucketBackupRemove BucketBackupRename BucketBackupRestore + ClusterRebalance ) // Description return a human-readable description of the operation type. @@ -210,6 +211,8 @@ func (t Type) Description() string { return "Renaming bucket backup" case BucketBackupRestore: return "Restoring bucket backup" + case ClusterRebalance: + return "Rebalancing cluster" default: return "Executing operation" } diff --git a/internal/server/metadata/configuration.json b/internal/server/metadata/configuration.json index 2aaf971443d..77bcd02a7c1 100644 --- a/internal/server/metadata/configuration.json +++ b/internal/server/metadata/configuration.json @@ -1646,6 +1646,42 @@ "shortdesc": "Threshold when an unresponsive member is considered offline", "type": "integer" } + }, + { + "cluster.rebalance.batch": { + "defaultdesc": "`5`", + "longdesc": "Maximum number of instances to move during one re-balancing run TODO: adjust the default to not be fixed", + "scope": "global", + "shortdesc": "Number of database voter members", + "type": "stromg" + } + }, + { + "cluster.rebalance.cooldown": { + "defaultdesc": "`1H`", + "longdesc": "Amount of time during which an instance will not be moved again", + "scope": "global", + "shortdesc": "Number of database voter members", + "type": "string" + } + }, + { + "cluster.rebalance.frequency": { + "defaultdesc": "`3`", + "longdesc": "This is how often we want to rebalance things TODO: make this more descriptive, update the validator", + "scope": "global", + "shortdesc": "Number of database voter members", + "type": "integer" + } + }, + { + "cluster.rebalance.threshold": { + "defaultdesc": "`3`", + "longdesc": "Load difference beteween most and least busy server needed to trigger a migration TODO: add a validator", + "scope": "global", + "shortdesc": "Number of database voter members", + "type": "integer" + } } ] }, diff --git a/internal/version/api.go b/internal/version/api.go index cdbfcdad14d..84e992fb10d 100644 --- a/internal/version/api.go +++ b/internal/version/api.go @@ -412,6 +412,7 @@ var APIExtensions = []string{ "storage_zfs_vdev", "container_migration_stateful", "profiles_all_projects", + "cluster_rebalance", } // APIExtensionsCount returns the number of available API extensions.