Skip to content

Commit

Permalink
Merge pull request #1054 from stgraber/main
Browse files Browse the repository at this point in the history
Various bugfixes
  • Loading branch information
hallyn authored Jul 27, 2024
2 parents e97c6e8 + 8e33c78 commit 8828254
Show file tree
Hide file tree
Showing 22 changed files with 178 additions and 85 deletions.
29 changes: 27 additions & 2 deletions client/incus_instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,9 @@ func (r *ProtocolIncus) tryCreateInstance(req api.InstancesPost, urls []string,
operation := req.Source.Operation

// Forward targetOp to remote op
chConnect := make(chan error, 1)
chWait := make(chan error, 1)

go func() {
success := false
var errors []remoteOperationResult
Expand Down Expand Up @@ -665,13 +668,35 @@ func (r *ProtocolIncus) tryCreateInstance(req api.InstancesPost, urls []string,
break
}

if !success {
rop.err = remoteOperationError("Failed instance creation", errors)
if success {
chConnect <- nil
close(chConnect)
} else {
chConnect <- remoteOperationError("Failed instance creation", errors)
close(chConnect)

if op != nil {
_ = op.Cancel()
}
}
}()

if op != nil {
go func() {
chWait <- op.Wait()
close(chWait)
}()
}

go func() {
var err error

select {
case err = <-chConnect:
case err = <-chWait:
}

rop.err = err
close(rop.chDone)
}()

Expand Down
14 changes: 4 additions & 10 deletions cmd/incus/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,6 @@ func (c *cmdImageCopy) Run(cmd *cobra.Command, args []string) error {
return err
}

// Revert project for `sourceServer` which may have been overwritten
// by `--project` flag in `GetImageServer` method
remote := conf.Remotes[remoteName]
if remote.Protocol == "incus" && !remote.Public {
d, ok := sourceServer.(incus.InstanceServer)
if ok {
sourceServer = d.UseProject(remote.Project)
}
}

// Parse destination remote
resources, err := c.global.ParseServers(args[1])
if err != nil {
Expand All @@ -228,8 +218,12 @@ func (c *cmdImageCopy) Run(cmd *cobra.Command, args []string) error {
imageType = "virtual-machine"
}

// Set the correct project on target.
remote := conf.Remotes[resources[0].remote]
if c.flagTargetProject != "" {
destinationServer = destinationServer.UseProject(c.flagTargetProject)
} else if remote.Protocol == "incus" {
destinationServer = destinationServer.UseProject(remote.Project)
}

// Copy the image
Expand Down
5 changes: 5 additions & 0 deletions cmd/incusd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,11 @@ func (s *httpServer) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
}

func setCORSHeaders(rw http.ResponseWriter, req *http.Request, config *clusterConfig.Config) {
// Check if we have a working config.
if config == nil {
return
}

allowedOrigin := config.HTTPSAllowedOrigin()
origin := req.Header.Get("Origin")
if allowedOrigin != "" && origin != "" {
Expand Down
34 changes: 18 additions & 16 deletions cmd/incusd/instance_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,25 +225,27 @@ func instancePost(d *Daemon, r *http.Request) response.Response {
}

// Checks for running instances.
if inst.IsRunning() && (req.Pool != "" || req.Project != "" || target != "") {
// Stateless migrations need the instance stopped.
if !req.Live {
return response.BadRequest(fmt.Errorf("Instance must be stopped to be moved statelessly"))
}
if inst.IsRunning() {
if req.Pool != "" || req.Project != "" || target != "" {
// Stateless migrations need the instance stopped.
if !req.Live {
return response.BadRequest(fmt.Errorf("Instance must be stopped to be moved statelessly"))
}

// Storage pool changes require a stopped instance.
if req.Pool != "" {
return response.BadRequest(fmt.Errorf("Instance must be stopped to be moved across storage pools"))
}
// Storage pool changes require a stopped instance.
if req.Pool != "" {
return response.BadRequest(fmt.Errorf("Instance must be stopped to be moved across storage pools"))
}

// Project changes require a stopped instance.
if req.Project != "" {
return response.BadRequest(fmt.Errorf("Instance must be stopped to be moved across projects"))
}
// Project changes require a stopped instance.
if req.Project != "" {
return response.BadRequest(fmt.Errorf("Instance must be stopped to be moved across projects"))
}

// Name changes require a stopped instance.
if req.Name != "" {
return response.BadRequest(fmt.Errorf("Instance must be stopped to change their names"))
// Name changes require a stopped instance.
if req.Name != "" {
return response.BadRequest(fmt.Errorf("Instance must be stopped to change their names"))
}
}
} else {
// Clear Live flag if instance isn't running.
Expand Down
12 changes: 6 additions & 6 deletions cmd/incusd/migrate_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,16 @@ func (s *migrationSourceWs) Do(state *state.State, migrateOp *operations.Operati
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*10)
defer cancel()

l.Info("Waiting for migration control connection on source")
l.Debug("Waiting for migration control connection on source")

_, err := s.conns[api.SecretNameControl].WebSocket(ctx)
if err != nil {
return fmt.Errorf("Failed waiting for migration control connection on source: %w", err)
}

l.Info("Migration control connection established on source")
l.Debug("Migration control connection established on source")

defer l.Info("Migration channels disconnected on source")
defer l.Debug("Migration channels disconnected on source")
defer s.disconnect()

stateConnFunc := func(ctx context.Context) (io.ReadWriteCloser, error) {
Expand Down Expand Up @@ -215,16 +215,16 @@ func (c *migrationSink) Do(state *state.State, instOp *operationlock.InstanceOpe
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*10)
defer cancel()

l.Info("Waiting for migration control connection on target")
l.Debug("Waiting for migration control connection on target")

_, err := c.conns[api.SecretNameControl].WebSocket(ctx)
if err != nil {
return fmt.Errorf("Failed waiting for migration control connection on target: %w", err)
}

l.Info("Migration control connection established on target")
l.Debug("Migration control connection established on target")

defer l.Info("Migration channels disconnected on target")
defer l.Debug("Migration channels disconnected on target")

if c.push {
defer c.disconnect()
Expand Down
34 changes: 26 additions & 8 deletions cmd/incusd/storage_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,21 @@ func storagePoolDelete(d *Daemon, r *http.Request) response.Response {
}
}

// If the pool requires deactivation, go through it first.
if !clusterNotification && pool.Driver().Info().Remote && pool.Driver().Info().Deactivate {
err = notifier(func(client incus.InstanceServer) error {
_, _, err := client.GetServer()
if err != nil {
return err
}

return client.DeleteStoragePool(pool.Name())
})
if err != nil {
return response.SmartError(err)
}
}

if pool.LocalStatus() != api.StoragePoolStatusPending {
err = pool.Delete(clientType, nil)
if err != nil {
Expand All @@ -1024,15 +1039,18 @@ func storagePoolDelete(d *Daemon, r *http.Request) response.Response {
return response.EmptySyncResponse
}

// If we are clustered, also notify all other nodes.
err = notifier(func(client incus.InstanceServer) error {
_, _, err := client.GetServer()
if err != nil {
return err
}
// If clustered and dealing with a normal pool, notify all other nodes.
if !pool.Driver().Info().Remote || !pool.Driver().Info().Deactivate {
err = notifier(func(client incus.InstanceServer) error {
_, _, err := client.GetServer()
if err != nil {
return err
}

return client.DeleteStoragePool(pool.Name())
})
}

return client.DeleteStoragePool(pool.Name())
})
if err != nil {
return response.SmartError(err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/server/cluster/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func Connect(address string, networkCert *localtls.CertInfo, serverCert *localtl
defer cancel()
err := EventListenerWait(ctx, address)
if err != nil {
return nil, fmt.Errorf("Missing event connection with target cluster member")
return nil, err
}
}

Expand Down
17 changes: 16 additions & 1 deletion internal/server/cluster/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cluster

import (
"context"
"fmt"
"slices"
"sync"
"time"
Expand Down Expand Up @@ -115,6 +116,7 @@ var eventHubAddresses []string
var eventHubPushCh = make(chan api.Event, 10) // Buffer size to accommodate slow consumers before dropping events.
var eventHubPushChTimeout = time.Duration(time.Second)
var listeners = map[string]*eventListenerClient{}
var listenersUnavailable = map[string]bool{}
var listenersNotify = map[chan struct{}][]string{}
var listenersLock sync.Mutex
var listenersUpdateLock sync.Mutex
Expand Down Expand Up @@ -149,6 +151,11 @@ func EventListenerWait(ctx context.Context, address string) error {
return nil
}

if listenersUnavailable[address] {
listenersLock.Unlock()
return fmt.Errorf("Server isn't ready yet")
}

listenAddresses := []string{address}

// Check if operating in event hub mode and if one of the event hub connections is available.
Expand Down Expand Up @@ -181,7 +188,11 @@ func EventListenerWait(ctx context.Context, address string) error {
case <-connected:
return nil
case <-ctx.Done():
return ctx.Err()
if ctx.Err() != nil {
return fmt.Errorf("Missing event connection with target cluster member")
}

return nil
}
}

Expand Down Expand Up @@ -306,6 +317,9 @@ func EventsUpdateListeners(endpoints *endpoints.Endpoints, cluster *db.Cluster,
l := logger.AddContext(logger.Ctx{"local": localAddress, "remote": m.Address})

if !HasConnectivity(endpoints.NetworkCert(), serverCert(), m.Address, true) {
listenersLock.Lock()
listenersUnavailable[m.Address] = true
listenersLock.Unlock()
return
}

Expand All @@ -325,6 +339,7 @@ func EventsUpdateListeners(endpoints *endpoints.Endpoints, cluster *db.Cluster,

listenersLock.Lock()
listeners[m.Address] = listener
listenersUnavailable[m.Address] = false

// Indicate to any notifiers waiting for this member's address that it is connected.
for connected, notifyAddresses := range listenersNotify {
Expand Down
14 changes: 7 additions & 7 deletions internal/server/instance/drivers/driver_lxc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3397,7 +3397,7 @@ func (d *lxc) Freeze() error {

// Check if the CGroup is available
if !d.state.OS.CGInfo.Supports(cgroup.Freezer, cg) {
d.logger.Info("Unable to freeze container (lack of kernel support)", ctxMap)
d.logger.Warn("Unable to freeze container (lack of kernel support)", ctxMap)
return nil
}

Expand Down Expand Up @@ -3447,7 +3447,7 @@ func (d *lxc) Unfreeze() error {

// Check if the CGroup is available
if !d.state.OS.CGInfo.Supports(cgroup.Freezer, cg) {
d.logger.Info("Unable to unfreeze container (lack of kernel support)", ctxMap)
d.logger.Warn("Unable to unfreeze container (lack of kernel support)", ctxMap)
return nil
}

Expand Down Expand Up @@ -5500,8 +5500,8 @@ fi
}

func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error {
d.logger.Info("Migration send starting")
defer d.logger.Info("Migration send stopped")
d.logger.Debug("Migration send starting")
defer d.logger.Debug("Migration send stopped")

// Wait for essential migration connections before negotiation.
connectionsCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
Expand Down Expand Up @@ -6087,8 +6087,8 @@ func (d *lxc) resetContainerDiskIdmap(srcIdmap *idmap.Set) error {
}

func (d *lxc) MigrateReceive(args instance.MigrateReceiveArgs) error {
d.logger.Info("Migration receive starting")
defer d.logger.Info("Migration receive stopped")
d.logger.Debug("Migration receive starting")
defer d.logger.Debug("Migration receive stopped")

// Wait for essential migration connections before negotiation.
connectionsCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
Expand Down Expand Up @@ -6811,7 +6811,7 @@ func (d *lxc) migrate(args *instance.CriuMigrationArgs) error {
if migrateErr != nil {
log, err2 := getCRIULogErrors(finalStateDir, prettyCmd)
if err2 == nil {
d.logger.Info("Failed migrating container", ctxMap)
d.logger.Warn("Failed migrating container", ctxMap)
migrateErr = fmt.Errorf("%s %s failed\n%s", args.Function, prettyCmd, log)
}

Expand Down
15 changes: 5 additions & 10 deletions internal/server/instance/drivers/driver_qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,10 +964,7 @@ func (d *qemu) restoreState(monitor *qmp.Monitor) error {
}

go func() {
_, err := io.Copy(pipeWrite, stateConn)
if err != nil {
d.logger.Warn("Failed reading from state connection", logger.Ctx{"err": err})
}
_, _ = io.Copy(pipeWrite, stateConn)

_ = pipeRead.Close()
_ = pipeWrite.Close()
Expand Down Expand Up @@ -4782,8 +4779,6 @@ func (d *qemu) Stop(stateful bool) error {
return err
}

d.state.Events.SendLifecycle(d.project.Name, lifecycle.InstanceStopped.Event(d, nil))

op.Done(nil)
return nil
}
Expand Down Expand Up @@ -6426,8 +6421,8 @@ func (d *qemu) Export(w io.Writer, properties map[string]string, expiration time

// MigrateSend is not currently supported.
func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error {
d.logger.Info("Migration send starting")
defer d.logger.Info("Migration send stopped")
d.logger.Debug("Migration send starting")
defer d.logger.Debug("Migration send stopped")

// Check for stateful support.
if args.Live && util.IsFalseOrEmpty(d.expandedConfig["migration.stateful"]) {
Expand Down Expand Up @@ -7004,8 +6999,8 @@ func (d *qemu) migrateSendLive(pool storagePools.Pool, clusterMoveSourceName str
}

func (d *qemu) MigrateReceive(args instance.MigrateReceiveArgs) error {
d.logger.Info("Migration receive starting")
defer d.logger.Info("Migration receive stopped")
d.logger.Debug("Migration receive starting")
defer d.logger.Debug("Migration receive stopped")

// Wait for essential migration connections before negotiation.
connectionsCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
Expand Down
Loading

0 comments on commit 8828254

Please sign in to comment.