Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/incusd: Add automatic live-migration to rebalance loads on incus cluster #835

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
259 changes: 259 additions & 0 deletions cmd/incusd/api_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/pem"
"errors"
"fmt"
"math"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -52,6 +53,7 @@ import (
"github.com/lxc/incus/v6/shared/logger"
"github.com/lxc/incus/v6/shared/osarch"
localtls "github.com/lxc/incus/v6/shared/tls"
"github.com/lxc/incus/v6/shared/units"
"github.com/lxc/incus/v6/shared/util"
"github.com/lxc/incus/v6/shared/validate"
)
Expand Down Expand Up @@ -4573,3 +4575,260 @@ func autoHealCluster(ctx context.Context, s *state.State, offlineMembers []db.No

return nil
}

func autoClusterRebalanceTask(d *Daemon) (task.Func, task.Schedule) {
s := d.State()
f := func(ctx context.Context) {
s := d.State()
rebalanceThreshold := s.GlobalConfig.ClusterRebalanceThreshold()
if rebalanceThreshold == 0 {
return // Skip rebalancing if it's disabled
}

leader, err := d.gateway.LeaderAddress()
if err != nil {
if errors.Is(err, cluster.ErrNodeIsNotClustered) {
return // Skip rebalancing if not clustered.
}

logger.Error("Failed to get leader cluster member address", logger.Ctx{"err": err})
return
}

if s.LocalConfig.ClusterAddress() != leader {
return // Skip rebalancing if not cluster leader.
}

// get all online members
var onlineMembers []db.NodeInfo
err = s.DB.Cluster.Transaction(ctx, func(ctx context.Context, tx *db.ClusterTx) error {
members, err := tx.GetNodes(ctx)
if err != nil {
return fmt.Errorf("Failed getting cluster members: %w", err)
}

onlineMembers, err = tx.GetCandidateMembers(ctx, members, nil, "", nil, s.GlobalConfig.OfflineThreshold())
if err != nil {
return fmt.Errorf("Failed getting online cluster members: %w", err)
}

return nil
})
if err != nil {
logger.Error("Failed getting cluster members", logger.Ctx{"err": err})
return
}

// map each architecture to the member that have it
architectureMap := make(map[string][]db.NodeInfo)

// maps the nodeinfo ID to the resources
resourcesMap := make(map[int64]*api.Resources)

// maps the nodeinfo ID to the member state
memberStateMap := make(map[int64]*api.ClusterMemberState)

for _, member := range onlineMembers {
clusterMember, err := cluster.Connect(member.Address, s.Endpoints.NetworkCert(), s.ServerCert(), nil, true)
if err != nil {
logger.Error("Failed to connect to cluster member", logger.Ctx{"err": err})
return
}

resources, err := clusterMember.GetServerResources()
if err != nil {
logger.Error("Failed to get resources for cluster member", logger.Ctx{"err": err})
return
}

resourcesMap[member.ID] = resources

memberState, _, err := clusterMember.GetClusterMemberState(member.Name)
if err != nil {
logger.Error("Failed to get cluster member state", logger.Ctx{"err": err})
return
}

memberStateMap[member.ID] = memberState

architecture := resources.CPU.Architecture

// add the architecture to the map
architectureMap[architecture] = append(architectureMap[architecture], member)
}

// migrate instances from max to min server per architecture
for architecture, servers := range architectureMap {
// check if there are no servers
if len(servers) < 2 {
continue
}

maxScore := -1.0
minScore := math.MaxFloat64

maxServer := servers[0]
minServer := servers[0]

// get the resources for each server
for _, server := range servers {
memberState := memberStateMap[server.ID]

// percent memory usage
memoryUsage := float64(memberState.SysInfo.TotalRAM-memberState.SysInfo.FreeRAM) / float64(memberState.SysInfo.TotalRAM)

score := memoryUsage

// update max / min score
if score > maxScore {
maxScore = score
maxServer = server
}

if score < minScore {
minScore = score
minServer = server
}
}

if maxScore-minScore >= float64(rebalanceThreshold) {
logger.Info("Rebalancing for Architecture", logger.Ctx{"architecture": architecture})

// get all instances from our maxscore server
var dbInstances []dbCluster.Instance
err = s.DB.Cluster.Transaction(ctx, func(ctx context.Context, tx *db.ClusterTx) error {
dbInstances, err = dbCluster.GetInstances(ctx, tx.Tx(), dbCluster.InstanceFilter{Node: &maxServer.Name})
if err != nil {
return fmt.Errorf("Failed to get instances: %w", err)
}

return nil
})
if err != nil {
logger.Error("Failed to get instances", logger.Ctx{"err": err})
return
}

// filter for instances that can be live migrated
var instances []instance.Instance
for _, dbInst := range dbInstances {
inst, err := instance.LoadByProjectAndName(s, dbInst.Project, dbInst.Name)
if err != nil {
logger.Error("Failed to load instance", logger.Ctx{"err": err})
return
}

// check if the instance can be live migrated, instance name not in map, or cooldown has passed

live := inst.CanMigrate() == "live-migrate"

// check if the name is in the map
_, exists := d.instanceMigrationCooldowns[inst.Name()]
if live && exists {
// check cooldown
if time.Now().Before(d.instanceMigrationCooldowns[inst.Name()]) {
live = false
}
}

if live {
instances = append(instances, inst)
}
}

// set a target score to determine how much we should move
targetScore := (maxScore + minScore) / 2

// get the difference from the min server score
deltaScore := targetScore - minScore

maxServerMemberState := memberStateMap[maxServer.ID]
minServerMemberState := memberStateMap[minServer.ID]

// translate deltascore to memory per server
maxServerMemoryToMigrate := deltaScore * float64(maxServerMemberState.SysInfo.TotalRAM)
minServerMemoryToMigrate := deltaScore * float64(minServerMemberState.SysInfo.TotalRAM)

// migrate instances to reach the minimum between them
memoryToMigrate := math.Min(maxServerMemoryToMigrate, minServerMemoryToMigrate)

// limits how many instances can be migrated in 1 sitting
batchLimit := s.GlobalConfig.ClusterRebalanceBatch()

var instancesToMigrate []instance.Instance

// select instances to migrate until we reach the desired memory
for _, inst := range instances {
memoryLimits, err := units.ParseByteSizeString(inst.ExpandedConfig()["limits.memory"])
if err != nil {
logger.Error("Failed to parse memory limits", logger.Ctx{"err": err})
return
}

// append instance to instancesToMigrate
instancesToMigrate = append(instancesToMigrate, inst)
memoryToMigrate -= float64(memoryLimits)
batchLimit -= 1

// check if we have reached the memory limit
if memoryToMigrate <= 0 || batchLimit == 0{
break
}
}

logger.Info("Number of instances to migrate", logger.Ctx{"count": len(instancesToMigrate)})

opRun := func(op *operations.Operation) error {
err := autoClusterRebalance(ctx, d, s, instancesToMigrate, &maxServer, &minServer, op)
if err != nil {
logger.Error("Failed rebalancing cluster instances", logger.Ctx{"err": err})
return err
}

return nil
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ClusterRebalance, nil, nil, opRun, nil, nil, nil)
if err != nil {
logger.Error("Failed creating auto cluster rebalancing operation", logger.Ctx{"err": err})
return
}

err = op.Start()
if err != nil {
logger.Error("Failed starting auto cluster rebalancing operation", logger.Ctx{"err": err})
return
}

err = op.Wait(ctx)
if err != nil {
logger.Error("Failed auto cluster rebalancing", logger.Ctx{"err": err})
return
}
}
}
}
return f, task.Every(time.Duration(s.GlobalConfig.ClusterRebalanceFrequency()) * time.Minute)
}

func autoClusterRebalance(ctx context.Context, d *Daemon, s *state.State, instancesToMigrate []instance.Instance, maxServer *db.NodeInfo, minServer *db.NodeInfo, op *operations.Operation) error {
for _, inst := range instancesToMigrate {
req := api.InstancePost{
Migration: true,
Live: true,
}

err := migrateInstance(ctx, s, inst, req, maxServer, minServer, op)
if err != nil {
return fmt.Errorf("Failed to migrate instance %q in project %q: %w", inst.Name(), inst.Project().Name, err)
}

// update the cooldown
cooldown, err := time.ParseDuration(s.GlobalConfig.ClusterRebalanceCooldown())
if err != nil {
return fmt.Errorf("Failed to parse cooldown duration: %w", err)
}
d.instanceMigrationCooldowns[inst.Name()] = time.Now().Add(time.Duration(cooldown))
}
return nil
}
7 changes: 7 additions & 0 deletions cmd/incusd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ type Daemon struct {
// OVN clients.
ovnnb *ovn.NB
ovnsb *ovn.SB

// Cooldowns map
instanceMigrationCooldowns map[string]time.Time
}

// DaemonConfig holds configuration values for Daemon.
Expand Down Expand Up @@ -196,6 +199,7 @@ func newDaemon(config *DaemonConfig, os *sys.OS) *Daemon {
shutdownCtx: shutdownCtx,
shutdownCancel: shutdownCancel,
shutdownDoneCh: make(chan error),
instanceMigrationCooldowns: make(map[string]time.Time),
}

d.serverCert = func() *localtls.CertInfo { return d.serverCertInt }
Expand Down Expand Up @@ -1646,6 +1650,9 @@ func (d *Daemon) startClusterTasks() {
// Heartbeats
d.taskClusterHeartbeat = d.clusterTasks.Add(cluster.HeartbeatTask(d.gateway))

// Auto-rebalance instances across cluster
d.clusterTasks.Add(autoClusterRebalanceTask(d))

// Auto-sync images across the cluster (hourly)
d.clusterTasks.Add(autoSyncImagesTask(d))

Expand Down
4 changes: 4 additions & 0 deletions doc/api-extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2460,3 +2460,7 @@ containers to another system.
## `profiles_all_projects`

This adds support for listing profiles across all projects through the `all-projects` parameter on the `GET /1.0/profiles`API.

## `cluster_rebalance`

This adds the ability to automatically livemigrate instances across a cluster to automatically balance the load.
32 changes: 32 additions & 0 deletions doc/config_options.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1509,6 +1509,38 @@ This must be an odd number >= `3`.
Specify the number of seconds after which an unresponsive member is considered offline.
```

```{config:option} cluster.rebalance.batch server-cluster
:defaultdesc: "`5`"
:scope: "global"
:shortdesc: "Number of database voter members"
:type: "stromg"
Maximum number of instances to move during one re-balancing run TODO: adjust the default to not be fixed
```

```{config:option} cluster.rebalance.cooldown server-cluster
:defaultdesc: "`1H`"
:scope: "global"
:shortdesc: "Number of database voter members"
:type: "string"
Amount of time during which an instance will not be moved again
```

```{config:option} cluster.rebalance.frequency server-cluster
:defaultdesc: "`3`"
:scope: "global"
:shortdesc: "Number of database voter members"
:type: "integer"
This is how often we want to rebalance things TODO: make this more descriptive, update the validator
```

```{config:option} cluster.rebalance.threshold server-cluster
:defaultdesc: "`3`"
:scope: "global"
:shortdesc: "Number of database voter members"
:type: "integer"
Load difference beteween most and least busy server needed to trigger a migration TODO: add a validator
```

<!-- config group server-cluster end -->
<!-- config group server-core start -->
```{config:option} core.bgp_address server-core
Expand Down
Binary file added incus
Binary file not shown.
Loading