diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index 278a9ea9cf4..7d6870328c7 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -29,3 +29,4 @@ - Send `fleet.host.id` to Endpoint Security {pull}21042[21042] - Add `install` and `uninstall` subcommands {pull}21206[21206] - Send updating state {pull}21461[21461] +- Add `upgrade` subcommand to perform upgrade of installed Elastic Agent {pull}21425[21425] diff --git a/x-pack/elastic-agent/pkg/agent/application/application.go b/x-pack/elastic-agent/pkg/agent/application/application.go index e003eed61a6..d721a8aa148 100644 --- a/x-pack/elastic-agent/pkg/agent/application/application.go +++ b/x-pack/elastic-agent/pkg/agent/application/application.go @@ -8,6 +8,7 @@ import ( "context" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/upgrade" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/warn" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" @@ -25,8 +26,12 @@ type reexecManager interface { ReExec(argOverrides ...string) } +type upgraderControl interface { + SetUpgrader(upgrader *upgrade.Upgrader) +} + // New creates a new Agent and bootstrap the required subsystem. -func New(log *logger.Logger, pathConfigFile string, reexec reexecManager) (Application, error) { +func New(log *logger.Logger, pathConfigFile string, reexec reexecManager, uc upgraderControl) (Application, error) { // Load configuration from disk to understand in which mode of operation // we must start the elastic-agent, the mode of operation cannot be changed without restarting the // elastic-agent. @@ -39,7 +44,7 @@ func New(log *logger.Logger, pathConfigFile string, reexec reexecManager) (Appli return nil, err } - return createApplication(log, pathConfigFile, rawConfig, reexec) + return createApplication(log, pathConfigFile, rawConfig, reexec, uc) } func createApplication( @@ -47,6 +52,7 @@ func createApplication( pathConfigFile string, rawConfig *config.Config, reexec reexecManager, + uc upgraderControl, ) (Application, error) { warn.LogNotGA(log) log.Info("Detecting execution mode") @@ -59,7 +65,7 @@ func createApplication( if isStandalone(cfg.Fleet) { log.Info("Agent is managed locally") - return newLocal(ctx, log, pathConfigFile, rawConfig) + return newLocal(ctx, log, pathConfigFile, rawConfig, reexec, uc) } log.Info("Agent is managed by Fleet") diff --git a/x-pack/elastic-agent/pkg/agent/application/handler_action_upgrade.go b/x-pack/elastic-agent/pkg/agent/application/handler_action_upgrade.go index 4d0026d4d79..a4940cfe55b 100644 --- a/x-pack/elastic-agent/pkg/agent/application/handler_action_upgrade.go +++ b/x-pack/elastic-agent/pkg/agent/application/handler_action_upgrade.go @@ -27,5 +27,21 @@ func (h *handlerUpgrade) Handle(ctx context.Context, a action, acker fleetAcker) return fmt.Errorf("invalid type, expected ActionUpgrade and received %T", a) } - return h.upgrader.Upgrade(ctx, action) + return h.upgrader.Upgrade(ctx, &upgradeAction{action}, true) +} + +type upgradeAction struct { + *fleetapi.ActionUpgrade +} + +func (a *upgradeAction) Version() string { + return a.ActionUpgrade.Version +} + +func (a *upgradeAction) SourceURI() string { + return a.ActionUpgrade.SourceURI +} + +func (a *upgradeAction) FleetAction() *fleetapi.ActionUpgrade { + return a.ActionUpgrade } diff --git a/x-pack/elastic-agent/pkg/agent/application/local_mode.go b/x-pack/elastic-agent/pkg/agent/application/local_mode.go index 5559089404e..f8eed0f5792 100644 --- a/x-pack/elastic-agent/pkg/agent/application/local_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/local_mode.go @@ -9,6 +9,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/filters" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/upgrade" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" @@ -60,6 +61,8 @@ func newLocal( log *logger.Logger, pathConfigFile string, rawConfig *config.Config, + reexec reexecManager, + uc upgraderControl, ) (*Local, error) { cfg, err := configuration.NewFromConfig(rawConfig) if err != nil { @@ -135,6 +138,17 @@ func newLocal( localApplication.source = cfgSource + // create a upgrader to use in local mode + upgrader := upgrade.NewUpgrader( + agentInfo, + cfg.Settings.DownloadConfig, + log, + []context.CancelFunc{localApplication.cancelCtxFn}, + reexec, + newNoopAcker(), + reporter) + uc.SetUpgrader(upgrader) + return localApplication, nil } diff --git a/x-pack/elastic-agent/pkg/agent/application/upgrade/step_download.go b/x-pack/elastic-agent/pkg/agent/application/upgrade/step_download.go index 9db442d3655..cf3a3656724 100644 --- a/x-pack/elastic-agent/pkg/agent/application/upgrade/step_download.go +++ b/x-pack/elastic-agent/pkg/agent/application/upgrade/step_download.go @@ -6,6 +6,7 @@ package upgrade import ( "context" + "strings" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" downloader "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/localremote" @@ -16,7 +17,13 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI stri // do not update source config settings := *u.settings if sourceURI != "" { - settings.SourceURI = sourceURI + if strings.HasPrefix(sourceURI, "file://") { + // update the DropPath so the fs.Downloader can download from this + // path instead of looking into the installed downloads directory + settings.DropPath = strings.TrimPrefix(sourceURI, "file://") + } else { + settings.SourceURI = sourceURI + } } allowEmptyPgp, pgp := release.PGP() diff --git a/x-pack/elastic-agent/pkg/agent/application/upgrade/step_mark.go b/x-pack/elastic-agent/pkg/agent/application/upgrade/step_mark.go index 53920e6ecff..8d03fec3ff6 100644 --- a/x-pack/elastic-agent/pkg/agent/application/upgrade/step_mark.go +++ b/x-pack/elastic-agent/pkg/agent/application/upgrade/step_mark.go @@ -37,7 +37,7 @@ type updateMarker struct { } // markUpgrade marks update happened so we can handle grace period -func (h *Upgrader) markUpgrade(ctx context.Context, hash string, action *fleetapi.ActionUpgrade) error { +func (h *Upgrader) markUpgrade(ctx context.Context, hash string, action Action) error { prevVersion := release.Version() prevHash := release.Commit() if len(prevHash) > hashLen { @@ -49,7 +49,7 @@ func (h *Upgrader) markUpgrade(ctx context.Context, hash string, action *fleetap UpdatedOn: time.Now(), PrevVersion: prevVersion, PrevHash: prevHash, - Action: action, + Action: action.FleetAction(), } markerBytes, err := yaml.Marshal(marker) diff --git a/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go b/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go index 7aacf77ba63..1a21bc154a1 100644 --- a/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go +++ b/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go @@ -44,6 +44,16 @@ type Upgrader struct { upgradeable bool } +// Action is the upgrade action state. +type Action interface { + // Version to upgrade to. + Version() string + // SourceURI for download. + SourceURI() string + // FleetAction is the action from fleet that started the action (optional). + FleetAction() *fleetapi.ActionUpgrade +} + type reexecManager interface { ReExec(argOverrides ...string) } @@ -60,13 +70,14 @@ type stateReporter interface { // NewUpgrader creates an upgrader which is capable of performing upgrade operation func NewUpgrader(agentInfo *info.AgentInfo, settings *artifact.Config, log *logger.Logger, closers []context.CancelFunc, reexec reexecManager, a acker, r stateReporter) *Upgrader { return &Upgrader{ + agentInfo: agentInfo, settings: settings, log: log, closers: closers, reexec: reexec, acker: a, reporter: r, - upgradeable: getUpgradable(), + upgradeable: getUpgradeable(), } } @@ -76,11 +87,13 @@ func (u *Upgrader) Upgradeable() bool { } // Upgrade upgrades running agent -func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) (err error) { +func (u *Upgrader) Upgrade(ctx context.Context, a Action, reexecNow bool) (err error) { // report failed defer func() { if err != nil { - u.reportFailure(ctx, a, err) + if action := a.FleetAction(); action != nil { + u.reportFailure(ctx, action, err) + } } }() @@ -90,15 +103,15 @@ func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) (err "running under control of the systems supervisor") } - u.reportUpdating(a.Version) + u.reportUpdating(a.Version()) - sourceURI, err := u.sourceURI(a.Version, a.SourceURI) - archivePath, err := u.downloadArtifact(ctx, a.Version, sourceURI) + sourceURI, err := u.sourceURI(a.Version(), a.SourceURI()) + archivePath, err := u.downloadArtifact(ctx, a.Version(), sourceURI) if err != nil { return err } - newHash, err := u.unpack(ctx, a.Version, archivePath) + newHash, err := u.unpack(ctx, a.Version(), archivePath) if err != nil { return err } @@ -109,7 +122,9 @@ func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) (err if strings.HasPrefix(release.Commit(), newHash) { // not an error - u.ackAction(ctx, a) + if action := a.FleetAction(); action != nil { + u.ackAction(ctx, action) + } u.log.Warn("upgrading to same version") return nil } @@ -128,7 +143,9 @@ func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) (err return err } - u.reexec.ReExec() + if reexecNow { + u.reexec.ReExec() + } return nil } @@ -224,7 +241,7 @@ func rollbackInstall(hash string) { os.RemoveAll(filepath.Join(paths.Data(), fmt.Sprintf("%s-%s", agentName, hash))) } -func getUpgradable() bool { +func getUpgradeable() bool { // only upgradeable if running from Agent installer and running under the // control of the system supervisor (or built specifically with upgrading enabled) return release.Upgradeable() || (install.RunningInstalled() && install.RunningUnderSupervisor()) diff --git a/x-pack/elastic-agent/pkg/agent/cmd/common.go b/x-pack/elastic-agent/pkg/agent/cmd/common.go index 8ca5700f3c6..39093be71b6 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/common.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/common.go @@ -68,6 +68,7 @@ func NewCommandWithArgs(args []string, streams *cli.IOStreams) *cobra.Command { cmd.AddCommand(run) cmd.AddCommand(newInstallCommandWithArgs(flags, args, streams)) cmd.AddCommand(newUninstallCommandWithArgs(flags, args, streams)) + cmd.AddCommand(newUpgradeCommandWithArgs(flags, args, streams)) cmd.AddCommand(newEnrollCommandWithArgs(flags, args, streams)) cmd.AddCommand(newInspectCommandWithArgs(flags, args, streams)) diff --git a/x-pack/elastic-agent/pkg/agent/cmd/run.go b/x-pack/elastic-agent/pkg/agent/cmd/run.go index 77beeb6fe1a..84dd8bd8a9a 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/run.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/run.go @@ -95,13 +95,13 @@ func run(flags *globalFlags, streams *cli.IOStreams) error { // Windows: Mark se rex := reexec.NewManager(rexLogger, execPath) // start the control listener - control := server.New(logger.Named("control"), rex) + control := server.New(logger.Named("control"), rex, nil) if err := control.Start(); err != nil { return err } defer control.Stop() - app, err := application.New(logger, pathConfigFile, rex) + app, err := application.New(logger, pathConfigFile, rex, control) if err != nil { return err } diff --git a/x-pack/elastic-agent/pkg/agent/cmd/upgrade.go b/x-pack/elastic-agent/pkg/agent/cmd/upgrade.go new file mode 100644 index 00000000000..81a5c82b4ab --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/cmd/upgrade.go @@ -0,0 +1,56 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cmd + +import ( + "context" + "fmt" + "os" + + "github.com/spf13/cobra" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" +) + +func newUpgradeCommandWithArgs(flags *globalFlags, _ []string, streams *cli.IOStreams) *cobra.Command { + cmd := &cobra.Command{ + Use: "upgrade ", + Short: "Upgrade the currently running Elastic Agent to the specified version", + Args: cobra.ExactArgs(1), + Run: func(c *cobra.Command, args []string) { + if err := upgradeCmd(streams, c, flags, args); err != nil { + fmt.Fprintf(streams.Err, "%v\n", err) + os.Exit(1) + } + }, + } + + cmd.Flags().StringP("source-uri", "s", "", "Source URI to download the new version from") + + return cmd +} + +func upgradeCmd(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, args []string) error { + fmt.Fprintln(streams.Out, "The upgrade process of Elastic Agent is currently EXPERIMENTAL and should not be used in production") + + version := args[0] + sourceURI, _ := cmd.Flags().GetString("source-uri") + + c := client.New() + err := c.Connect(context.Background()) + if err != nil { + return errors.New(err, "Failed communicating to running daemon", errors.TypeNetwork, errors.M("socket", control.Address())) + } + defer c.Disconnect() + version, err = c.Upgrade(context.Background(), version, sourceURI) + if err != nil { + return errors.New(err, "Failed trigger upgrade of daemon") + } + fmt.Fprintf(streams.Out, "Upgrade triggered to version %s, Elastic Agent is currently restarting\n", version) + return nil +} diff --git a/x-pack/elastic-agent/pkg/agent/control/control_test.go b/x-pack/elastic-agent/pkg/agent/control/control_test.go index 9454179ae60..5c56aed4691 100644 --- a/x-pack/elastic-agent/pkg/agent/control/control_test.go +++ b/x-pack/elastic-agent/pkg/agent/control/control_test.go @@ -20,7 +20,7 @@ import ( ) func TestServerClient_Version(t *testing.T) { - srv := server.New(newErrorLogger(t), nil) + srv := server.New(newErrorLogger(t), nil, nil) err := srv.Start() require.NoError(t, err) defer srv.Stop() diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index faa7982c814..0ce970c9256 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -7,14 +7,17 @@ package server import ( "context" "net" - - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/reexec" + "sync" + "time" "google.golang.org/grpc" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/reexec" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/upgrade" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" ) @@ -22,18 +25,28 @@ import ( type Server struct { logger *logger.Logger rex reexec.ExecManager + up *upgrade.Upgrader listener net.Listener server *grpc.Server + lock sync.RWMutex } // New creates a new control protocol server. -func New(log *logger.Logger, rex reexec.ExecManager) *Server { +func New(log *logger.Logger, rex reexec.ExecManager, up *upgrade.Upgrader) *Server { return &Server{ logger: log, rex: rex, + up: up, } } +// SetUpgrader changes the upgrader. +func (s *Server) SetUpgrader(up *upgrade.Upgrader) { + s.lock.Lock() + defer s.lock.Unlock() + s.up = up +} + // Start starts the GRPC endpoint and accepts new connections. func (s *Server) Start() error { if s.server != nil { @@ -100,10 +113,48 @@ func (s *Server) Restart(_ context.Context, _ *proto.Empty) (*proto.RestartRespo // Upgrade performs the upgrade operation. func (s *Server) Upgrade(ctx context.Context, request *proto.UpgradeRequest) (*proto.UpgradeResponse, error) { - // not implemented + s.lock.RLock() + u := s.up + s.lock.RUnlock() + if u == nil { + // not running with upgrader (must be controlled by Fleet) + return &proto.UpgradeResponse{ + Status: proto.ActionStatus_FAILURE, + Error: "cannot be upgraded; perform upgrading using Fleet", + }, nil + } + err := u.Upgrade(ctx, &upgradeRequest{request}, false) + if err != nil { + return &proto.UpgradeResponse{ + Status: proto.ActionStatus_FAILURE, + Error: err.Error(), + }, nil + } + // perform the re-exec after a 1 second delay + // this ensures that the upgrade response over GRPC is returned + go func() { + <-time.After(time.Second) + s.rex.ReExec() + }() return &proto.UpgradeResponse{ - Status: proto.ActionStatus_FAILURE, - Version: "", - Error: "not implemented", + Status: proto.ActionStatus_SUCCESS, + Version: request.Version, }, nil } + +type upgradeRequest struct { + *proto.UpgradeRequest +} + +func (r *upgradeRequest) Version() string { + return r.GetVersion() +} + +func (r *upgradeRequest) SourceURI() string { + return r.GetSourceURI() +} + +func (r *upgradeRequest) FleetAction() *fleetapi.ActionUpgrade { + // upgrade request not from Fleet + return nil +} diff --git a/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go b/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go index 119809338d6..6c656839820 100644 --- a/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go +++ b/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go @@ -52,7 +52,7 @@ func TestCmdBinaryOnlyYAML(t *testing.T) { } func TestCmdDaemon(t *testing.T) { - srv := server.New(newErrorLogger(t), nil) + srv := server.New(newErrorLogger(t), nil, nil) require.NoError(t, srv.Start()) defer srv.Stop() @@ -67,7 +67,7 @@ func TestCmdDaemon(t *testing.T) { } func TestCmdDaemonYAML(t *testing.T) { - srv := server.New(newErrorLogger(t), nil) + srv := server.New(newErrorLogger(t), nil, nil) require.NoError(t, srv.Start()) defer srv.Stop()