Skip to content

Commit

Permalink
cmd/incusd: Run cluster evacuate and restore in parallel
Browse files Browse the repository at this point in the history
Sequentially evacuating or restoring a server can be slow when there
are several instances running. This switches to running migrations in
parallel, based on the number of detected CPUs available.

Signed-off-by: Mathias Gibbens <mathias.gibbens@futurfusion.io>
  • Loading branch information
gibmat committed Sep 27, 2024
1 parent 2bacf2f commit 82b094a
Showing 1 changed file with 48 additions and 12 deletions.
60 changes: 48 additions & 12 deletions cmd/incusd/api_cluster_evacuation.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import (
"net"
"net/http"
"net/url"
"runtime"
"slices"
"strconv"
"strings"
"time"

"github.com/gorilla/mux"

"golang.org/x/sync/errgroup"

"github.com/lxc/incus/v6/client"
"github.com/lxc/incus/v6/internal/revert"
"github.com/lxc/incus/v6/internal/server/cluster"
Expand Down Expand Up @@ -159,9 +162,17 @@ func evacuateInstances(ctx context.Context, opts evacuateOpts) error {
return fmt.Errorf("Missing migration callback function")
}

metadata := make(map[string]any)
// Limit the number of concurrent evacuations to run at the same time
numParallelEvacs := max(runtime.NumCPU()/1, 1) // FIXME after testing, bump NumCPU division to 8 or 16

group, groupCtx := errgroup.WithContext(ctx)
group.SetLimit(numParallelEvacs)

for _, inst := range opts.instances {
// Accesses variables groupCtx, inst, and opts
group.Go(func() error {
metadata := make(map[string]any) // FIXME indent code block after review (makes diff noisy)

instProject := inst.Project()
l := logger.AddContext(logger.Ctx{"project": instProject.Name, "instance": inst.Name()})

Expand All @@ -178,7 +189,7 @@ func evacuateInstances(ctx context.Context, opts evacuateOpts) error {

if action != "migrate" {
// We can only migrate instances or leave them as they are.
continue
return nil
}
} else if opts.mode != "auto" {
action = opts.mode
Expand All @@ -200,17 +211,17 @@ func evacuateInstances(ctx context.Context, opts evacuateOpts) error {

if action != "migrate" {
// Done with this instance.
continue
return nil
}
}

// Find a new location for the instance.
sourceMemberInfo, targetMemberInfo, err := evacuateClusterSelectTarget(ctx, opts.s, inst)
sourceMemberInfo, targetMemberInfo, err := evacuateClusterSelectTarget(groupCtx, opts.s, inst)
if err != nil {
if api.StatusErrorCheck(err, http.StatusNotFound) {
// Skip migration if no target is available.
l.Warn("No migration target available for instance")
continue
return nil
}
}

Expand All @@ -224,10 +235,18 @@ func evacuateInstances(ctx context.Context, opts evacuateOpts) error {
}

start := isRunning || instanceShouldAutoStart(inst)
err = opts.migrateInstance(ctx, opts.s, inst, sourceMemberInfo, targetMemberInfo, action == "live-migrate", start, metadata, opts.op)
err = opts.migrateInstance(groupCtx, opts.s, inst, sourceMemberInfo, targetMemberInfo, action == "live-migrate", start, metadata, opts.op)
if err != nil {
return err
}

return nil
})
}

err := group.Wait()
if err != nil {
return fmt.Errorf("Failed to evacuate instances: %w", err)
}

return nil
Expand Down Expand Up @@ -294,11 +313,6 @@ func restoreClusterMember(d *Daemon, r *http.Request) response.Response {
_ = evacuateClusterSetState(s, originName, db.ClusterMemberStateEvacuated)
})

var source incus.InstanceServer
var sourceNode db.NodeInfo

metadata := make(map[string]any)

// Restart the networks.
err = networkStartup(d.State())
if err != nil {
Expand All @@ -318,6 +332,7 @@ func restoreClusterMember(d *Daemon, r *http.Request) response.Response {
}

// Start the instance.
metadata := make(map[string]any)
metadata["evacuation_progress"] = fmt.Sprintf("Starting %q in project %q", inst.Name(), inst.Project().Name)
_ = op.UpdateMetadata(metadata)

Expand All @@ -334,8 +349,21 @@ func restoreClusterMember(d *Daemon, r *http.Request) response.Response {
}
}

// Limit the number of concurrent migrations to run at the same time
numParallelMigrations := max(runtime.NumCPU()/1, 1) // FIXME after testing, bump NumCPU division to 8 or 16

group := new(errgroup.Group)
group.SetLimit(numParallelMigrations)

// Migrate back the remote instances.
for _, inst := range instances {
// Accesses variables inst, op, originName, r, and s
group.Go(func() error {
var err error // FIXME indent code block after review (makes diff noisy)
var source incus.InstanceServer
var sourceNode db.NodeInfo
metadata := make(map[string]any)

l := logger.AddContext(logger.Ctx{"project": inst.Project().Name, "instance": inst.Name()})

// Check the action.
Expand Down Expand Up @@ -449,7 +477,7 @@ func restoreClusterMember(d *Daemon, r *http.Request) response.Response {
}

if !isRunning || live {
continue
return nil
}

metadata["evacuation_progress"] = fmt.Sprintf("Starting %q in project %q", inst.Name(), inst.Project().Name)
Expand All @@ -459,6 +487,14 @@ func restoreClusterMember(d *Daemon, r *http.Request) response.Response {
if err != nil {
return fmt.Errorf("Failed to start instance %q: %w", inst.Name(), err)
}

return nil
})
}

err = group.Wait()
if err != nil {
return fmt.Errorf("Failed to restore instances: %w", err)
}

reverter.Success()
Expand Down

0 comments on commit 82b094a

Please sign in to comment.