Skip to content

Commit

Permalink
[Elastic Agent] Add upgrade CLI to initiate upgrade of Agent locally (#…
Browse files Browse the repository at this point in the history
…21425) (#21542)

* Add new upgrade command to initiate a local upgrade of Elastic Agent.

* Update drop path with file:// prefix is defined.

* Add comment.

* Add missing new line.

* Add changelog.

* Prevent upgrading of Agent locally when connected to Fleet.

* Fixes from rebase.

(cherry picked from commit c858dd0)
  • Loading branch information
blakerouse committed Oct 6, 2020
1 parent bad38ab commit f3b1b7b
Show file tree
Hide file tree
Showing 13 changed files with 198 additions and 29 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@
- Send `fleet.host.id` to Endpoint Security {pull}21042[21042]
- Add `install` and `uninstall` subcommands {pull}21206[21206]
- Send updating state {pull}21461[21461]
- Add `upgrade` subcommand to perform upgrade of installed Elastic Agent {pull}21425[21425]
12 changes: 9 additions & 3 deletions x-pack/elastic-agent/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/upgrade"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/warn"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
Expand All @@ -25,8 +26,12 @@ type reexecManager interface {
ReExec(argOverrides ...string)
}

type upgraderControl interface {
SetUpgrader(upgrader *upgrade.Upgrader)
}

// New creates a new Agent and bootstrap the required subsystem.
func New(log *logger.Logger, pathConfigFile string, reexec reexecManager) (Application, error) {
func New(log *logger.Logger, pathConfigFile string, reexec reexecManager, uc upgraderControl) (Application, error) {
// Load configuration from disk to understand in which mode of operation
// we must start the elastic-agent, the mode of operation cannot be changed without restarting the
// elastic-agent.
Expand All @@ -39,14 +44,15 @@ func New(log *logger.Logger, pathConfigFile string, reexec reexecManager) (Appli
return nil, err
}

return createApplication(log, pathConfigFile, rawConfig, reexec)
return createApplication(log, pathConfigFile, rawConfig, reexec, uc)
}

func createApplication(
log *logger.Logger,
pathConfigFile string,
rawConfig *config.Config,
reexec reexecManager,
uc upgraderControl,
) (Application, error) {
warn.LogNotGA(log)
log.Info("Detecting execution mode")
Expand All @@ -59,7 +65,7 @@ func createApplication(

if isStandalone(cfg.Fleet) {
log.Info("Agent is managed locally")
return newLocal(ctx, log, pathConfigFile, rawConfig)
return newLocal(ctx, log, pathConfigFile, rawConfig, reexec, uc)
}

log.Info("Agent is managed by Fleet")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,21 @@ func (h *handlerUpgrade) Handle(ctx context.Context, a action, acker fleetAcker)
return fmt.Errorf("invalid type, expected ActionUpgrade and received %T", a)
}

return h.upgrader.Upgrade(ctx, action)
return h.upgrader.Upgrade(ctx, &upgradeAction{action}, true)
}

type upgradeAction struct {
*fleetapi.ActionUpgrade
}

func (a *upgradeAction) Version() string {
return a.ActionUpgrade.Version
}

func (a *upgradeAction) SourceURI() string {
return a.ActionUpgrade.SourceURI
}

func (a *upgradeAction) FleetAction() *fleetapi.ActionUpgrade {
return a.ActionUpgrade
}
14 changes: 14 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/filters"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/upgrade"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
Expand Down Expand Up @@ -60,6 +61,8 @@ func newLocal(
log *logger.Logger,
pathConfigFile string,
rawConfig *config.Config,
reexec reexecManager,
uc upgraderControl,
) (*Local, error) {
cfg, err := configuration.NewFromConfig(rawConfig)
if err != nil {
Expand Down Expand Up @@ -135,6 +138,17 @@ func newLocal(

localApplication.source = cfgSource

// create a upgrader to use in local mode
upgrader := upgrade.NewUpgrader(
agentInfo,
cfg.Settings.DownloadConfig,
log,
[]context.CancelFunc{localApplication.cancelCtxFn},
reexec,
newNoopAcker(),
reporter)
uc.SetUpgrader(upgrader)

return localApplication, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package upgrade

import (
"context"
"strings"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
downloader "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/localremote"
Expand All @@ -16,7 +17,13 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI stri
// do not update source config
settings := *u.settings
if sourceURI != "" {
settings.SourceURI = sourceURI
if strings.HasPrefix(sourceURI, "file://") {
// update the DropPath so the fs.Downloader can download from this
// path instead of looking into the installed downloads directory
settings.DropPath = strings.TrimPrefix(sourceURI, "file://")
} else {
settings.SourceURI = sourceURI
}
}

allowEmptyPgp, pgp := release.PGP()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type updateMarker struct {
}

// markUpgrade marks update happened so we can handle grace period
func (h *Upgrader) markUpgrade(ctx context.Context, hash string, action *fleetapi.ActionUpgrade) error {
func (h *Upgrader) markUpgrade(ctx context.Context, hash string, action Action) error {
prevVersion := release.Version()
prevHash := release.Commit()
if len(prevHash) > hashLen {
Expand All @@ -49,7 +49,7 @@ func (h *Upgrader) markUpgrade(ctx context.Context, hash string, action *fleetap
UpdatedOn: time.Now(),
PrevVersion: prevVersion,
PrevHash: prevHash,
Action: action,
Action: action.FleetAction(),
}

markerBytes, err := yaml.Marshal(marker)
Expand Down
37 changes: 27 additions & 10 deletions x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ type Upgrader struct {
upgradeable bool
}

// Action is the upgrade action state.
type Action interface {
// Version to upgrade to.
Version() string
// SourceURI for download.
SourceURI() string
// FleetAction is the action from fleet that started the action (optional).
FleetAction() *fleetapi.ActionUpgrade
}

type reexecManager interface {
ReExec(argOverrides ...string)
}
Expand All @@ -60,13 +70,14 @@ type stateReporter interface {
// NewUpgrader creates an upgrader which is capable of performing upgrade operation
func NewUpgrader(agentInfo *info.AgentInfo, settings *artifact.Config, log *logger.Logger, closers []context.CancelFunc, reexec reexecManager, a acker, r stateReporter) *Upgrader {
return &Upgrader{
agentInfo: agentInfo,
settings: settings,
log: log,
closers: closers,
reexec: reexec,
acker: a,
reporter: r,
upgradeable: getUpgradable(),
upgradeable: getUpgradeable(),
}
}

Expand All @@ -76,11 +87,13 @@ func (u *Upgrader) Upgradeable() bool {
}

// Upgrade upgrades running agent
func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) (err error) {
func (u *Upgrader) Upgrade(ctx context.Context, a Action, reexecNow bool) (err error) {
// report failed
defer func() {
if err != nil {
u.reportFailure(ctx, a, err)
if action := a.FleetAction(); action != nil {
u.reportFailure(ctx, action, err)
}
}
}()

Expand All @@ -90,15 +103,15 @@ func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) (err
"running under control of the systems supervisor")
}

u.reportUpdating(a.Version)
u.reportUpdating(a.Version())

sourceURI, err := u.sourceURI(a.Version, a.SourceURI)
archivePath, err := u.downloadArtifact(ctx, a.Version, sourceURI)
sourceURI, err := u.sourceURI(a.Version(), a.SourceURI())
archivePath, err := u.downloadArtifact(ctx, a.Version(), sourceURI)
if err != nil {
return err
}

newHash, err := u.unpack(ctx, a.Version, archivePath)
newHash, err := u.unpack(ctx, a.Version(), archivePath)
if err != nil {
return err
}
Expand All @@ -109,7 +122,9 @@ func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) (err

if strings.HasPrefix(release.Commit(), newHash) {
// not an error
u.ackAction(ctx, a)
if action := a.FleetAction(); action != nil {
u.ackAction(ctx, action)
}
u.log.Warn("upgrading to same version")
return nil
}
Expand All @@ -128,7 +143,9 @@ func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) (err
return err
}

u.reexec.ReExec()
if reexecNow {
u.reexec.ReExec()
}
return nil
}

Expand Down Expand Up @@ -224,7 +241,7 @@ func rollbackInstall(hash string) {
os.RemoveAll(filepath.Join(paths.Data(), fmt.Sprintf("%s-%s", agentName, hash)))
}

func getUpgradable() bool {
func getUpgradeable() bool {
// only upgradeable if running from Agent installer and running under the
// control of the system supervisor (or built specifically with upgrading enabled)
return release.Upgradeable() || (install.RunningInstalled() && install.RunningUnderSupervisor())
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func NewCommandWithArgs(args []string, streams *cli.IOStreams) *cobra.Command {
cmd.AddCommand(run)
cmd.AddCommand(newInstallCommandWithArgs(flags, args, streams))
cmd.AddCommand(newUninstallCommandWithArgs(flags, args, streams))
cmd.AddCommand(newUpgradeCommandWithArgs(flags, args, streams))
cmd.AddCommand(newEnrollCommandWithArgs(flags, args, streams))
cmd.AddCommand(newInspectCommandWithArgs(flags, args, streams))

Expand Down
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ func run(flags *globalFlags, streams *cli.IOStreams) error { // Windows: Mark se
rex := reexec.NewManager(rexLogger, execPath)

// start the control listener
control := server.New(logger.Named("control"), rex)
control := server.New(logger.Named("control"), rex, nil)
if err := control.Start(); err != nil {
return err
}
defer control.Stop()

app, err := application.New(logger, pathConfigFile, rex)
app, err := application.New(logger, pathConfigFile, rex, control)
if err != nil {
return err
}
Expand Down
56 changes: 56 additions & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/upgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package cmd

import (
"context"
"fmt"
"os"

"github.com/spf13/cobra"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli"
)

func newUpgradeCommandWithArgs(flags *globalFlags, _ []string, streams *cli.IOStreams) *cobra.Command {
cmd := &cobra.Command{
Use: "upgrade <version>",
Short: "Upgrade the currently running Elastic Agent to the specified version",
Args: cobra.ExactArgs(1),
Run: func(c *cobra.Command, args []string) {
if err := upgradeCmd(streams, c, flags, args); err != nil {
fmt.Fprintf(streams.Err, "%v\n", err)
os.Exit(1)
}
},
}

cmd.Flags().StringP("source-uri", "s", "", "Source URI to download the new version from")

return cmd
}

func upgradeCmd(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, args []string) error {
fmt.Fprintln(streams.Out, "The upgrade process of Elastic Agent is currently EXPERIMENTAL and should not be used in production")

version := args[0]
sourceURI, _ := cmd.Flags().GetString("source-uri")

c := client.New()
err := c.Connect(context.Background())
if err != nil {
return errors.New(err, "Failed communicating to running daemon", errors.TypeNetwork, errors.M("socket", control.Address()))
}
defer c.Disconnect()
version, err = c.Upgrade(context.Background(), version, sourceURI)
if err != nil {
return errors.New(err, "Failed trigger upgrade of daemon")
}
fmt.Fprintf(streams.Out, "Upgrade triggered to version %s, Elastic Agent is currently restarting\n", version)
return nil
}
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

func TestServerClient_Version(t *testing.T) {
srv := server.New(newErrorLogger(t), nil)
srv := server.New(newErrorLogger(t), nil, nil)
err := srv.Start()
require.NoError(t, err)
defer srv.Stop()
Expand Down
Loading

0 comments on commit f3b1b7b

Please sign in to comment.