diff --git a/cmd/incusd/api_cluster_evacuation.go b/cmd/incusd/api_cluster_evacuation.go index df89b9effa..c6d844d832 100644 --- a/cmd/incusd/api_cluster_evacuation.go +++ b/cmd/incusd/api_cluster_evacuation.go @@ -7,6 +7,7 @@ import ( "net" "net/http" "net/url" + "runtime" "slices" "strconv" "strings" @@ -14,6 +15,8 @@ import ( "github.com/gorilla/mux" + "golang.org/x/sync/errgroup" + "github.com/lxc/incus/v6/client" "github.com/lxc/incus/v6/internal/revert" "github.com/lxc/incus/v6/internal/server/cluster" @@ -159,9 +162,17 @@ func evacuateInstances(ctx context.Context, opts evacuateOpts) error { return fmt.Errorf("Missing migration callback function") } - metadata := make(map[string]any) + // Limit the number of concurrent evacuations to run at the same time + numParallelEvacs := max(runtime.NumCPU()/1, 1) // FIXME after testing, bump NumCPU division to 8 or 16 + + group, groupCtx := errgroup.WithContext(ctx) + group.SetLimit(numParallelEvacs) for _, inst := range opts.instances { + // Accesses variables groupCtx, inst, and opts + group.Go(func() error { + metadata := make(map[string]any) // FIXME indent code block after review (makes diff noisy) + instProject := inst.Project() l := logger.AddContext(logger.Ctx{"project": instProject.Name, "instance": inst.Name()}) @@ -178,7 +189,7 @@ func evacuateInstances(ctx context.Context, opts evacuateOpts) error { if action != "migrate" { // We can only migrate instances or leave them as they are. - continue + return nil } } else if opts.mode != "auto" { action = opts.mode @@ -200,17 +211,17 @@ func evacuateInstances(ctx context.Context, opts evacuateOpts) error { if action != "migrate" { // Done with this instance. - continue + return nil } } // Find a new location for the instance. - sourceMemberInfo, targetMemberInfo, err := evacuateClusterSelectTarget(ctx, opts.s, inst) + sourceMemberInfo, targetMemberInfo, err := evacuateClusterSelectTarget(groupCtx, opts.s, inst) if err != nil { if api.StatusErrorCheck(err, http.StatusNotFound) { // Skip migration if no target is available. l.Warn("No migration target available for instance") - continue + return nil } } @@ -224,10 +235,18 @@ func evacuateInstances(ctx context.Context, opts evacuateOpts) error { } start := isRunning || instanceShouldAutoStart(inst) - err = opts.migrateInstance(ctx, opts.s, inst, sourceMemberInfo, targetMemberInfo, action == "live-migrate", start, metadata, opts.op) + err = opts.migrateInstance(groupCtx, opts.s, inst, sourceMemberInfo, targetMemberInfo, action == "live-migrate", start, metadata, opts.op) if err != nil { return err } + + return nil + }) + } + + err := group.Wait() + if err != nil { + return fmt.Errorf("Failed to evacuate instances: %w", err) } return nil @@ -294,11 +313,6 @@ func restoreClusterMember(d *Daemon, r *http.Request) response.Response { _ = evacuateClusterSetState(s, originName, db.ClusterMemberStateEvacuated) }) - var source incus.InstanceServer - var sourceNode db.NodeInfo - - metadata := make(map[string]any) - // Restart the networks. err = networkStartup(d.State()) if err != nil { @@ -318,6 +332,7 @@ func restoreClusterMember(d *Daemon, r *http.Request) response.Response { } // Start the instance. + metadata := make(map[string]any) metadata["evacuation_progress"] = fmt.Sprintf("Starting %q in project %q", inst.Name(), inst.Project().Name) _ = op.UpdateMetadata(metadata) @@ -334,8 +349,21 @@ func restoreClusterMember(d *Daemon, r *http.Request) response.Response { } } + // Limit the number of concurrent migrations to run at the same time + numParallelMigrations := max(runtime.NumCPU()/1, 1) // FIXME after testing, bump NumCPU division to 8 or 16 + + group := new(errgroup.Group) + group.SetLimit(numParallelMigrations) + // Migrate back the remote instances. for _, inst := range instances { + // Accesses variables inst, op, originName, r, and s + group.Go(func() error { + var err error // FIXME indent code block after review (makes diff noisy) + var source incus.InstanceServer + var sourceNode db.NodeInfo + metadata := make(map[string]any) + l := logger.AddContext(logger.Ctx{"project": inst.Project().Name, "instance": inst.Name()}) // Check the action. @@ -449,7 +477,7 @@ func restoreClusterMember(d *Daemon, r *http.Request) response.Response { } if !isRunning || live { - continue + return nil } metadata["evacuation_progress"] = fmt.Sprintf("Starting %q in project %q", inst.Name(), inst.Project().Name) @@ -459,6 +487,14 @@ func restoreClusterMember(d *Daemon, r *http.Request) response.Response { if err != nil { return fmt.Errorf("Failed to start instance %q: %w", inst.Name(), err) } + + return nil + }) + } + + err = group.Wait() + if err != nil { + return fmt.Errorf("Failed to restore instances: %w", err) } reverter.Success()