Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #20359 to 7.x: [Elastic Agent] Improve version, restart, enroll CLI commands #20431

Merged
merged 2 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,6 @@
- Will retry to enroll if the server return a 429. {pull}19918[19811]
- Add --staging option to enroll command {pull}20026[20026]
- Add `event.dataset` to all events {pull}20076[20076]
- Improved version CLI {pull}20359[20359]
- Enroll CLI now restarts running daemon {pull}20359[20359]
- Add restart CLI cmd {pull}20359[20359]
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/control.proto
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ message StatusResponse {
repeated ApplicationStatus applications = 3;
}

service ElasticAgent {
service ElasticAgentControl {
// Fetches the currently running version of the Elastic Agent.
rpc Version(Empty) returns (VersionResponse);

Expand Down
18 changes: 2 additions & 16 deletions x-pack/elastic-agent/pkg/agent/application/reexec/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,9 @@
package reexec

import (
"sync"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

var (
execSingleton ExecManager
execSingletonOnce sync.Once
)

// ExecManager is the interface that the global reexec manager implements.
type ExecManager interface {
// ReExec asynchronously re-executes command in the same PID and memory address
Expand All @@ -30,14 +23,6 @@ type ExecManager interface {
ShutdownComplete()
}

// Manager returns the global reexec manager.
func Manager(log *logger.Logger, exec string) ExecManager {
execSingletonOnce.Do(func() {
execSingleton = newManager(log, exec)
})
return execSingleton
}

type manager struct {
logger *logger.Logger
exec string
Expand All @@ -46,7 +31,8 @@ type manager struct {
complete chan bool
}

func newManager(log *logger.Logger, exec string) *manager {
// NewManager returns the reexec manager.
func NewManager(log *logger.Logger, exec string) ExecManager {
return &manager{
logger: log,
exec: exec,
Expand Down
24 changes: 23 additions & 1 deletion x-pack/elastic-agent/pkg/agent/cmd/enroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
package cmd

import (
"context"
"fmt"
"math/rand"
"os"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client"

"github.com/spf13/cobra"

"github.com/elastic/beats/v7/libbeat/common/backoff"
Expand Down Expand Up @@ -45,6 +48,7 @@ func newEnrollCommandWithArgs(flags *globalFlags, _ []string, streams *cli.IOStr
cmd.Flags().BoolP("force", "f", false, "Force overwrite the current and do not prompt for confirmation")
cmd.Flags().BoolP("insecure", "i", false, "Allow insecure connection to Kibana")
cmd.Flags().StringP("staging", "", "", "Configures agent to download artifacts from a staging build")
cmd.Flags().Bool("no-restart", false, "Skip restarting the currently running daemon")

return cmd
}
Expand Down Expand Up @@ -144,7 +148,25 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, args
return errors.New(err, "fail to enroll")
}

fmt.Fprintln(streams.Out, "Successfully enrolled the Agent.")
fmt.Fprintln(streams.Out, "Successfully enrolled the Elastic Agent.")

// skip restarting
noRestart, _ := cmd.Flags().GetBool("no-restart")
if noRestart {
return nil
}

daemon := client.New()
err = daemon.Connect(context.Background())
if err == nil {
defer daemon.Disconnect()
err = daemon.Restart(context.Background())
if err == nil {
fmt.Fprintln(streams.Out, "Successfully triggered restart on running Elastic Agent.")
return nil
}
}
fmt.Fprintln(streams.Out, "Elastic Agent might not be running; unable to trigger restart")
return nil
}

Expand Down
10 changes: 9 additions & 1 deletion x-pack/elastic-agent/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/reexec"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
Expand Down Expand Up @@ -80,7 +81,14 @@ func run(flags *globalFlags, streams *cli.IOStreams) error {
return err
}
rexLogger := logger.Named("reexec")
rex := reexec.Manager(rexLogger, execPath)
rex := reexec.NewManager(rexLogger, execPath)

// start the control listener
control := server.New(logger.Named("control"), rex)
if err := control.Start(); err != nil {
return err
}
defer control.Stop()

app, err := application.New(logger, pathConfigFile)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions x-pack/elastic-agent/pkg/agent/control/addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@
package control

import (
"crypto/sha256"
"fmt"
"path/filepath"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
)

// Address returns the address to connect to Elastic Agent daemon.
func Address() string {
data := paths.Data()
return fmt.Sprintf("unix://%s", filepath.Join(data, "agent.sock"))
// entire string cannot be longer than 107 characters, this forces the
// length to always be 88 characters (but unique per data path)
return fmt.Sprintf(`unix:///tmp/elastic-agent-%x.sock`, sha256.Sum256([]byte(data)))
}
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/agent/control/addr_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (

// Address returns the address to connect to Elastic Agent daemon.
func Address() string {
data = paths.Data()
data := paths.Data()
// entire string cannot be longer than 256 characters, this forces the
// length to always be 87 characters (but unique per data path)
return fmt.Sprintf(`\\.\pipe\elastic-agent-%s`, sha256.Sum256(data))
return fmt.Sprintf(`\\.\pipe\elastic-agent-%x`, sha256.Sum256([]byte(data)))
}
22 changes: 11 additions & 11 deletions x-pack/elastic-agent/pkg/agent/control/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"context"
"encoding/json"
"fmt"

"sync"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto"
)

Expand Down Expand Up @@ -62,10 +62,10 @@ type AgentStatus struct {

// Client communicates to Elastic Agent through the control protocol.
type Client interface {
// Start starts the client.
Start(ctx context.Context) error
// Stop stops the client.
Stop()
// Connect connects to the running Elastic Agent.
Connect(ctx context.Context) error
// Disconnect disconnects from the running Elastic Agent.
Disconnect()
// Version returns the current version of the running agent.
Version(ctx context.Context) (Version, error)
// Status returns the current status of the running agent.
Expand All @@ -81,7 +81,7 @@ type client struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
client proto.ElasticAgentClient
client proto.ElasticAgentControlClient
cfgLock sync.RWMutex
obsLock sync.RWMutex
}
Expand All @@ -91,19 +91,19 @@ func New() Client {
return &client{}
}

// Start starts the connection to Elastic Agent.
func (c *client) Start(ctx context.Context) error {
// Connect connects to the running Elastic Agent.
func (c *client) Connect(ctx context.Context) error {
c.ctx, c.cancel = context.WithCancel(ctx)
conn, err := dialContext(ctx)
if err != nil {
return err
}
c.client = proto.NewElasticAgentClient(conn)
c.client = proto.NewElasticAgentControlClient(conn)
return nil
}

// Stop stops the connection to Elastic Agent.
func (c *client) Stop() {
// Disconnect disconnects from the running Elastic Agent.
func (c *client) Disconnect() {
if c.cancel != nil {
c.cancel()
c.wg.Wait()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ func dialContext(ctx context.Context) (*grpc.ClientConn, error) {
}

func dialer(ctx context.Context, addr string) (net.Conn, error) {
return npipe.DialContext(arr)(ctx, "", "")
return npipe.DialContext(addr)(ctx, "", "")
}
6 changes: 3 additions & 3 deletions x-pack/elastic-agent/pkg/agent/control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ import (
)

func TestServerClient_Version(t *testing.T) {
srv := server.New(newErrorLogger(t))
srv := server.New(newErrorLogger(t), nil)
err := srv.Start()
require.NoError(t, err)
defer srv.Stop()

c := client.New()
err = c.Start(context.Background())
err = c.Connect(context.Background())
require.NoError(t, err)
defer c.Stop()
defer c.Disconnect()

ver, err := c.Version(context.Background())
require.NoError(t, err)
Expand Down
Loading