diff --git a/x-pack/elastic-agent/CHANGELOG.asciidoc b/x-pack/elastic-agent/CHANGELOG.asciidoc index 8ed2402bd37..c0d50c96c6f 100644 --- a/x-pack/elastic-agent/CHANGELOG.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.asciidoc @@ -98,3 +98,6 @@ - Will retry to enroll if the server return a 429. {pull}19918[19811] - Add --staging option to enroll command {pull}20026[20026] - Add `event.dataset` to all events {pull}20076[20076] +- Improved version CLI {pull}20359[20359] +- Enroll CLI now restarts running daemon {pull}20359[20359] +- Add restart CLI cmd {pull}20359[20359] diff --git a/x-pack/elastic-agent/control.proto b/x-pack/elastic-agent/control.proto index a7ff22e5157..0c5645faab9 100644 --- a/x-pack/elastic-agent/control.proto +++ b/x-pack/elastic-agent/control.proto @@ -104,7 +104,7 @@ message StatusResponse { repeated ApplicationStatus applications = 3; } -service ElasticAgent { +service ElasticAgentControl { // Fetches the currently running version of the Elastic Agent. rpc Version(Empty) returns (VersionResponse); diff --git a/x-pack/elastic-agent/pkg/agent/application/reexec/manager.go b/x-pack/elastic-agent/pkg/agent/application/reexec/manager.go index 33de754a27d..4662b1c6230 100644 --- a/x-pack/elastic-agent/pkg/agent/application/reexec/manager.go +++ b/x-pack/elastic-agent/pkg/agent/application/reexec/manager.go @@ -5,16 +5,9 @@ package reexec import ( - "sync" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" ) -var ( - execSingleton ExecManager - execSingletonOnce sync.Once -) - // ExecManager is the interface that the global reexec manager implements. type ExecManager interface { // ReExec asynchronously re-executes command in the same PID and memory address @@ -30,14 +23,6 @@ type ExecManager interface { ShutdownComplete() } -// Manager returns the global reexec manager. -func Manager(log *logger.Logger, exec string) ExecManager { - execSingletonOnce.Do(func() { - execSingleton = newManager(log, exec) - }) - return execSingleton -} - type manager struct { logger *logger.Logger exec string @@ -46,7 +31,8 @@ type manager struct { complete chan bool } -func newManager(log *logger.Logger, exec string) *manager { +// NewManager returns the reexec manager. +func NewManager(log *logger.Logger, exec string) ExecManager { return &manager{ logger: log, exec: exec, diff --git a/x-pack/elastic-agent/pkg/agent/cmd/enroll.go b/x-pack/elastic-agent/pkg/agent/cmd/enroll.go index 311ad31e63b..b34e0236782 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/enroll.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/enroll.go @@ -5,11 +5,14 @@ package cmd import ( + "context" "fmt" "math/rand" "os" "time" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client" + "github.com/spf13/cobra" "github.com/elastic/beats/v7/libbeat/common/backoff" @@ -45,6 +48,7 @@ func newEnrollCommandWithArgs(flags *globalFlags, _ []string, streams *cli.IOStr cmd.Flags().BoolP("force", "f", false, "Force overwrite the current and do not prompt for confirmation") cmd.Flags().BoolP("insecure", "i", false, "Allow insecure connection to Kibana") cmd.Flags().StringP("staging", "", "", "Configures agent to download artifacts from a staging build") + cmd.Flags().Bool("no-restart", false, "Skip restarting the currently running daemon") return cmd } @@ -144,7 +148,25 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, args return errors.New(err, "fail to enroll") } - fmt.Fprintln(streams.Out, "Successfully enrolled the Agent.") + fmt.Fprintln(streams.Out, "Successfully enrolled the Elastic Agent.") + + // skip restarting + noRestart, _ := cmd.Flags().GetBool("no-restart") + if noRestart { + return nil + } + + daemon := client.New() + err = daemon.Connect(context.Background()) + if err == nil { + defer daemon.Disconnect() + err = daemon.Restart(context.Background()) + if err == nil { + fmt.Fprintln(streams.Out, "Successfully triggered restart on running Elastic Agent.") + return nil + } + } + fmt.Fprintln(streams.Out, "Elastic Agent might not be running; unable to trigger restart") return nil } diff --git a/x-pack/elastic-agent/pkg/agent/cmd/run.go b/x-pack/elastic-agent/pkg/agent/cmd/run.go index 1c3c81b9ff7..f502ef9cf49 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/run.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/run.go @@ -19,6 +19,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/reexec" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/server" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" @@ -80,7 +81,14 @@ func run(flags *globalFlags, streams *cli.IOStreams) error { return err } rexLogger := logger.Named("reexec") - rex := reexec.Manager(rexLogger, execPath) + rex := reexec.NewManager(rexLogger, execPath) + + // start the control listener + control := server.New(logger.Named("control"), rex) + if err := control.Start(); err != nil { + return err + } + defer control.Stop() app, err := application.New(logger, pathConfigFile) if err != nil { diff --git a/x-pack/elastic-agent/pkg/agent/control/addr.go b/x-pack/elastic-agent/pkg/agent/control/addr.go index 3416480a6a0..20bc1e6a005 100644 --- a/x-pack/elastic-agent/pkg/agent/control/addr.go +++ b/x-pack/elastic-agent/pkg/agent/control/addr.go @@ -7,8 +7,8 @@ package control import ( + "crypto/sha256" "fmt" - "path/filepath" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" ) @@ -16,5 +16,7 @@ import ( // Address returns the address to connect to Elastic Agent daemon. func Address() string { data := paths.Data() - return fmt.Sprintf("unix://%s", filepath.Join(data, "agent.sock")) + // entire string cannot be longer than 107 characters, this forces the + // length to always be 88 characters (but unique per data path) + return fmt.Sprintf(`unix:///tmp/elastic-agent-%x.sock`, sha256.Sum256([]byte(data))) } diff --git a/x-pack/elastic-agent/pkg/agent/control/addr_windows.go b/x-pack/elastic-agent/pkg/agent/control/addr_windows.go index 1123eec941b..bf2e164fbae 100644 --- a/x-pack/elastic-agent/pkg/agent/control/addr_windows.go +++ b/x-pack/elastic-agent/pkg/agent/control/addr_windows.go @@ -15,8 +15,8 @@ import ( // Address returns the address to connect to Elastic Agent daemon. func Address() string { - data = paths.Data() + data := paths.Data() // entire string cannot be longer than 256 characters, this forces the // length to always be 87 characters (but unique per data path) - return fmt.Sprintf(`\\.\pipe\elastic-agent-%s`, sha256.Sum256(data)) + return fmt.Sprintf(`\\.\pipe\elastic-agent-%x`, sha256.Sum256([]byte(data))) } diff --git a/x-pack/elastic-agent/pkg/agent/control/client/client.go b/x-pack/elastic-agent/pkg/agent/control/client/client.go index bcd8eccdb82..5e55fce9349 100644 --- a/x-pack/elastic-agent/pkg/agent/control/client/client.go +++ b/x-pack/elastic-agent/pkg/agent/control/client/client.go @@ -8,11 +8,11 @@ import ( "context" "encoding/json" "fmt" + "sync" "time" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto" ) @@ -62,10 +62,10 @@ type AgentStatus struct { // Client communicates to Elastic Agent through the control protocol. type Client interface { - // Start starts the client. - Start(ctx context.Context) error - // Stop stops the client. - Stop() + // Connect connects to the running Elastic Agent. + Connect(ctx context.Context) error + // Disconnect disconnects from the running Elastic Agent. + Disconnect() // Version returns the current version of the running agent. Version(ctx context.Context) (Version, error) // Status returns the current status of the running agent. @@ -81,7 +81,7 @@ type client struct { ctx context.Context cancel context.CancelFunc wg sync.WaitGroup - client proto.ElasticAgentClient + client proto.ElasticAgentControlClient cfgLock sync.RWMutex obsLock sync.RWMutex } @@ -91,19 +91,19 @@ func New() Client { return &client{} } -// Start starts the connection to Elastic Agent. -func (c *client) Start(ctx context.Context) error { +// Connect connects to the running Elastic Agent. +func (c *client) Connect(ctx context.Context) error { c.ctx, c.cancel = context.WithCancel(ctx) conn, err := dialContext(ctx) if err != nil { return err } - c.client = proto.NewElasticAgentClient(conn) + c.client = proto.NewElasticAgentControlClient(conn) return nil } -// Stop stops the connection to Elastic Agent. -func (c *client) Stop() { +// Disconnect disconnects from the running Elastic Agent. +func (c *client) Disconnect() { if c.cancel != nil { c.cancel() c.wg.Wait() diff --git a/x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go b/x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go index 58b36c18043..c061753d327 100644 --- a/x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go +++ b/x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go @@ -22,5 +22,5 @@ func dialContext(ctx context.Context) (*grpc.ClientConn, error) { } func dialer(ctx context.Context, addr string) (net.Conn, error) { - return npipe.DialContext(arr)(ctx, "", "") + return npipe.DialContext(addr)(ctx, "", "") } diff --git a/x-pack/elastic-agent/pkg/agent/control/control_test.go b/x-pack/elastic-agent/pkg/agent/control/control_test.go index 13d32420258..9454179ae60 100644 --- a/x-pack/elastic-agent/pkg/agent/control/control_test.go +++ b/x-pack/elastic-agent/pkg/agent/control/control_test.go @@ -20,15 +20,15 @@ import ( ) func TestServerClient_Version(t *testing.T) { - srv := server.New(newErrorLogger(t)) + srv := server.New(newErrorLogger(t), nil) err := srv.Start() require.NoError(t, err) defer srv.Stop() c := client.New() - err = c.Start(context.Background()) + err = c.Connect(context.Background()) require.NoError(t, err) - defer c.Stop() + defer c.Disconnect() ver, err := c.Version(context.Background()) require.NoError(t, err) diff --git a/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go b/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go index 58df5e28f19..a0e2e710f0c 100644 --- a/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go +++ b/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go @@ -662,24 +662,24 @@ var file_control_proto_rawDesc = []byte{ 0x10, 0x06, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x4f, 0x4c, 0x4c, 0x42, 0x41, 0x43, 0x4b, 0x10, 0x07, 0x2a, 0x28, 0x0a, 0x0c, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x00, 0x12, 0x0b, 0x0a, - 0x07, 0x46, 0x41, 0x49, 0x4c, 0x55, 0x52, 0x45, 0x10, 0x01, 0x32, 0xd9, 0x01, 0x0a, 0x0c, 0x45, - 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, 0x2f, 0x0a, 0x07, 0x56, - 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, - 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x56, 0x65, 0x72, - 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2d, 0x0a, 0x06, - 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, - 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x07, 0x52, - 0x65, 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, - 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x73, - 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x38, 0x0a, 0x07, - 0x55, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x12, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, - 0x55, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, + 0x07, 0x46, 0x41, 0x49, 0x4c, 0x55, 0x52, 0x45, 0x10, 0x01, 0x32, 0xe0, 0x01, 0x0a, 0x13, 0x45, + 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x72, + 0x6f, 0x6c, 0x12, 0x2f, 0x0a, 0x07, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x0c, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x2d, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0c, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x15, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x07, 0x52, 0x65, 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x0c, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x73, 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x38, 0x0a, 0x07, 0x55, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x12, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x55, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x22, 0x5a, 0x1d, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x67, - 0x65, 0x6e, 0x74, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0xf8, 0x01, 0x01, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x55, 0x70, + 0x67, 0x72, 0x61, 0x64, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x22, 0x5a, + 0x1d, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, + 0x6f, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0xf8, 0x01, + 0x01, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -713,14 +713,14 @@ var file_control_proto_depIdxs = []int32{ 0, // 2: proto.ApplicationStatus.status:type_name -> proto.Status 0, // 3: proto.StatusResponse.status:type_name -> proto.Status 7, // 4: proto.StatusResponse.applications:type_name -> proto.ApplicationStatus - 2, // 5: proto.ElasticAgent.Version:input_type -> proto.Empty - 2, // 6: proto.ElasticAgent.Status:input_type -> proto.Empty - 2, // 7: proto.ElasticAgent.Restart:input_type -> proto.Empty - 5, // 8: proto.ElasticAgent.Upgrade:input_type -> proto.UpgradeRequest - 3, // 9: proto.ElasticAgent.Version:output_type -> proto.VersionResponse - 8, // 10: proto.ElasticAgent.Status:output_type -> proto.StatusResponse - 4, // 11: proto.ElasticAgent.Restart:output_type -> proto.RestartResponse - 6, // 12: proto.ElasticAgent.Upgrade:output_type -> proto.UpgradeResponse + 2, // 5: proto.ElasticAgentControl.Version:input_type -> proto.Empty + 2, // 6: proto.ElasticAgentControl.Status:input_type -> proto.Empty + 2, // 7: proto.ElasticAgentControl.Restart:input_type -> proto.Empty + 5, // 8: proto.ElasticAgentControl.Upgrade:input_type -> proto.UpgradeRequest + 3, // 9: proto.ElasticAgentControl.Version:output_type -> proto.VersionResponse + 8, // 10: proto.ElasticAgentControl.Status:output_type -> proto.StatusResponse + 4, // 11: proto.ElasticAgentControl.Restart:output_type -> proto.RestartResponse + 6, // 12: proto.ElasticAgentControl.Upgrade:output_type -> proto.UpgradeResponse 9, // [9:13] is the sub-list for method output_type 5, // [5:9] is the sub-list for method input_type 5, // [5:5] is the sub-list for extension type_name @@ -848,10 +848,10 @@ var _ grpc.ClientConnInterface // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion6 -// ElasticAgentClient is the client API for ElasticAgent service. +// ElasticAgentControlClient is the client API for ElasticAgentControl service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type ElasticAgentClient interface { +type ElasticAgentControlClient interface { // Fetches the currently running version of the Elastic Agent. Version(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*VersionResponse, error) // Fetches the currently status of the Elastic Agent. @@ -862,52 +862,52 @@ type ElasticAgentClient interface { Upgrade(ctx context.Context, in *UpgradeRequest, opts ...grpc.CallOption) (*UpgradeResponse, error) } -type elasticAgentClient struct { +type elasticAgentControlClient struct { cc grpc.ClientConnInterface } -func NewElasticAgentClient(cc grpc.ClientConnInterface) ElasticAgentClient { - return &elasticAgentClient{cc} +func NewElasticAgentControlClient(cc grpc.ClientConnInterface) ElasticAgentControlClient { + return &elasticAgentControlClient{cc} } -func (c *elasticAgentClient) Version(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*VersionResponse, error) { +func (c *elasticAgentControlClient) Version(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*VersionResponse, error) { out := new(VersionResponse) - err := c.cc.Invoke(ctx, "/proto.ElasticAgent/Version", in, out, opts...) + err := c.cc.Invoke(ctx, "/proto.ElasticAgentControl/Version", in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *elasticAgentClient) Status(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*StatusResponse, error) { +func (c *elasticAgentControlClient) Status(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*StatusResponse, error) { out := new(StatusResponse) - err := c.cc.Invoke(ctx, "/proto.ElasticAgent/Status", in, out, opts...) + err := c.cc.Invoke(ctx, "/proto.ElasticAgentControl/Status", in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *elasticAgentClient) Restart(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*RestartResponse, error) { +func (c *elasticAgentControlClient) Restart(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*RestartResponse, error) { out := new(RestartResponse) - err := c.cc.Invoke(ctx, "/proto.ElasticAgent/Restart", in, out, opts...) + err := c.cc.Invoke(ctx, "/proto.ElasticAgentControl/Restart", in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *elasticAgentClient) Upgrade(ctx context.Context, in *UpgradeRequest, opts ...grpc.CallOption) (*UpgradeResponse, error) { +func (c *elasticAgentControlClient) Upgrade(ctx context.Context, in *UpgradeRequest, opts ...grpc.CallOption) (*UpgradeResponse, error) { out := new(UpgradeResponse) - err := c.cc.Invoke(ctx, "/proto.ElasticAgent/Upgrade", in, out, opts...) + err := c.cc.Invoke(ctx, "/proto.ElasticAgentControl/Upgrade", in, out, opts...) if err != nil { return nil, err } return out, nil } -// ElasticAgentServer is the server API for ElasticAgent service. -type ElasticAgentServer interface { +// ElasticAgentControlServer is the server API for ElasticAgentControl service. +type ElasticAgentControlServer interface { // Fetches the currently running version of the Elastic Agent. Version(context.Context, *Empty) (*VersionResponse, error) // Fetches the currently status of the Elastic Agent. @@ -918,118 +918,118 @@ type ElasticAgentServer interface { Upgrade(context.Context, *UpgradeRequest) (*UpgradeResponse, error) } -// UnimplementedElasticAgentServer can be embedded to have forward compatible implementations. -type UnimplementedElasticAgentServer struct { +// UnimplementedElasticAgentControlServer can be embedded to have forward compatible implementations. +type UnimplementedElasticAgentControlServer struct { } -func (*UnimplementedElasticAgentServer) Version(context.Context, *Empty) (*VersionResponse, error) { +func (*UnimplementedElasticAgentControlServer) Version(context.Context, *Empty) (*VersionResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Version not implemented") } -func (*UnimplementedElasticAgentServer) Status(context.Context, *Empty) (*StatusResponse, error) { +func (*UnimplementedElasticAgentControlServer) Status(context.Context, *Empty) (*StatusResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Status not implemented") } -func (*UnimplementedElasticAgentServer) Restart(context.Context, *Empty) (*RestartResponse, error) { +func (*UnimplementedElasticAgentControlServer) Restart(context.Context, *Empty) (*RestartResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Restart not implemented") } -func (*UnimplementedElasticAgentServer) Upgrade(context.Context, *UpgradeRequest) (*UpgradeResponse, error) { +func (*UnimplementedElasticAgentControlServer) Upgrade(context.Context, *UpgradeRequest) (*UpgradeResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Upgrade not implemented") } -func RegisterElasticAgentServer(s *grpc.Server, srv ElasticAgentServer) { - s.RegisterService(&_ElasticAgent_serviceDesc, srv) +func RegisterElasticAgentControlServer(s *grpc.Server, srv ElasticAgentControlServer) { + s.RegisterService(&_ElasticAgentControl_serviceDesc, srv) } -func _ElasticAgent_Version_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _ElasticAgentControl_Version_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(Empty) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(ElasticAgentServer).Version(ctx, in) + return srv.(ElasticAgentControlServer).Version(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/proto.ElasticAgent/Version", + FullMethod: "/proto.ElasticAgentControl/Version", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ElasticAgentServer).Version(ctx, req.(*Empty)) + return srv.(ElasticAgentControlServer).Version(ctx, req.(*Empty)) } return interceptor(ctx, in, info, handler) } -func _ElasticAgent_Status_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _ElasticAgentControl_Status_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(Empty) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(ElasticAgentServer).Status(ctx, in) + return srv.(ElasticAgentControlServer).Status(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/proto.ElasticAgent/Status", + FullMethod: "/proto.ElasticAgentControl/Status", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ElasticAgentServer).Status(ctx, req.(*Empty)) + return srv.(ElasticAgentControlServer).Status(ctx, req.(*Empty)) } return interceptor(ctx, in, info, handler) } -func _ElasticAgent_Restart_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _ElasticAgentControl_Restart_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(Empty) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(ElasticAgentServer).Restart(ctx, in) + return srv.(ElasticAgentControlServer).Restart(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/proto.ElasticAgent/Restart", + FullMethod: "/proto.ElasticAgentControl/Restart", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ElasticAgentServer).Restart(ctx, req.(*Empty)) + return srv.(ElasticAgentControlServer).Restart(ctx, req.(*Empty)) } return interceptor(ctx, in, info, handler) } -func _ElasticAgent_Upgrade_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _ElasticAgentControl_Upgrade_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(UpgradeRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(ElasticAgentServer).Upgrade(ctx, in) + return srv.(ElasticAgentControlServer).Upgrade(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/proto.ElasticAgent/Upgrade", + FullMethod: "/proto.ElasticAgentControl/Upgrade", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ElasticAgentServer).Upgrade(ctx, req.(*UpgradeRequest)) + return srv.(ElasticAgentControlServer).Upgrade(ctx, req.(*UpgradeRequest)) } return interceptor(ctx, in, info, handler) } -var _ElasticAgent_serviceDesc = grpc.ServiceDesc{ - ServiceName: "proto.ElasticAgent", - HandlerType: (*ElasticAgentServer)(nil), +var _ElasticAgentControl_serviceDesc = grpc.ServiceDesc{ + ServiceName: "proto.ElasticAgentControl", + HandlerType: (*ElasticAgentControlServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "Version", - Handler: _ElasticAgent_Version_Handler, + Handler: _ElasticAgentControl_Version_Handler, }, { MethodName: "Status", - Handler: _ElasticAgent_Status_Handler, + Handler: _ElasticAgentControl_Status_Handler, }, { MethodName: "Restart", - Handler: _ElasticAgent_Restart_Handler, + Handler: _ElasticAgentControl_Restart_Handler, }, { MethodName: "Upgrade", - Handler: _ElasticAgent_Upgrade_Handler, + Handler: _ElasticAgentControl_Upgrade_Handler, }, }, Streams: []grpc.StreamDesc{}, diff --git a/x-pack/elastic-agent/pkg/agent/control/server/listener.go b/x-pack/elastic-agent/pkg/agent/control/server/listener.go index 2dd5d54a46f..bf03f54e2da 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/listener.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/listener.go @@ -7,16 +7,26 @@ package server import ( + "fmt" + "net" "os" "path/filepath" "strings" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" ) -func createListener() (net.Listener, error) { +func createListener(log *logger.Logger) (net.Listener, error) { path := strings.TrimPrefix(control.Address(), "unix://") + if _, err := os.Stat(path); !os.IsNotExist(err) { + err = os.Remove(path) + if err != nil { + log.Errorf("%s", errors.New(err, fmt.Sprintf("Failed to cleanup %s", path), errors.TypeFilesystem, errors.M("path", path))) + } + } dir := filepath.Dir(path) if _, err := os.Stat(dir); os.IsNotExist(err) { err = os.MkdirAll(dir, 0755) @@ -36,3 +46,10 @@ func createListener() (net.Listener, error) { } return lis, err } + +func cleanupListener(log *logger.Logger) { + path := strings.TrimPrefix(control.Address(), "unix://") + if err := os.Remove(path); err != nil { + log.Errorf("%s", errors.New(err, fmt.Sprintf("Failed to cleanup %s", path), errors.TypeFilesystem, errors.M("path", path))) + } +} diff --git a/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go b/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go index d2d2866b98a..f98c32bcee3 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go @@ -13,10 +13,11 @@ import ( "github.com/elastic/beats/v7/libbeat/api/npipe" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" ) // createListener creates a named pipe listener on Windows -func createListener() (net.Listener, error) { +func createListener(_ *logger.Logger) (net.Listener, error) { u, err := user.Current() if err != nil { return nil, err @@ -27,3 +28,7 @@ func createListener() (net.Listener, error) { } return npipe.NewListener(control.Address(), sd) } + +func cleanupListener(_ *logger.Logger) { + // nothing to do on windows +} diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index c9a750808fc..faa7982c814 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -8,26 +8,29 @@ import ( "context" "net" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/reexec" "google.golang.org/grpc" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" ) // Server is the daemon side of the control protocol. type Server struct { logger *logger.Logger + rex reexec.ExecManager listener net.Listener server *grpc.Server } // New creates a new control protocol server. -func New(log *logger.Logger) *Server { +func New(log *logger.Logger, rex reexec.ExecManager) *Server { return &Server{ logger: log, + rex: rex, } } @@ -38,13 +41,13 @@ func (s *Server) Start() error { return nil } - lis, err := createListener() + lis, err := createListener(s.logger) if err != nil { return err } s.listener = lis s.server = grpc.NewServer() - proto.RegisterElasticAgentServer(s.server, s) + proto.RegisterElasticAgentControlServer(s.server, s) // start serving GRPC connections go func() { @@ -63,6 +66,7 @@ func (s *Server) Stop() { s.server.Stop() s.server = nil s.listener = nil + cleanupListener(s.logger) } } @@ -88,10 +92,9 @@ func (s *Server) Status(_ context.Context, _ *proto.Empty) (*proto.StatusRespons // Restart performs re-exec. func (s *Server) Restart(_ context.Context, _ *proto.Empty) (*proto.RestartResponse, error) { - // not implemented + s.rex.ReExec() return &proto.RestartResponse{ - Status: proto.ActionStatus_FAILURE, - Error: "not implemented", + Status: proto.ActionStatus_SUCCESS, }, nil } diff --git a/x-pack/elastic-agent/pkg/basecmd/cmd.go b/x-pack/elastic-agent/pkg/basecmd/cmd.go index 9b957916fb1..b30b540d472 100644 --- a/x-pack/elastic-agent/pkg/basecmd/cmd.go +++ b/x-pack/elastic-agent/pkg/basecmd/cmd.go @@ -7,6 +7,7 @@ package basecmd import ( "github.com/spf13/cobra" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/basecmd/restart" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/basecmd/version" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" ) @@ -14,6 +15,7 @@ import ( // NewDefaultCommandsWithArgs returns a list of default commands to executes. func NewDefaultCommandsWithArgs(args []string, streams *cli.IOStreams) []*cobra.Command { return []*cobra.Command{ + restart.NewCommandWithArgs(streams), version.NewCommandWithArgs(streams), } } diff --git a/x-pack/elastic-agent/pkg/basecmd/restart/cmd.go b/x-pack/elastic-agent/pkg/basecmd/restart/cmd.go new file mode 100644 index 00000000000..ebb3bf6effd --- /dev/null +++ b/x-pack/elastic-agent/pkg/basecmd/restart/cmd.go @@ -0,0 +1,37 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package restart + +import ( + "context" + + "github.com/spf13/cobra" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" +) + +// NewCommandWithArgs returns a new version command. +func NewCommandWithArgs(streams *cli.IOStreams) *cobra.Command { + return &cobra.Command{ + Use: "restart", + Short: "Restart the currently running Elastic Agent daemon", + RunE: func(cmd *cobra.Command, _ []string) error { + c := client.New() + err := c.Connect(context.Background()) + if err != nil { + return errors.New(err, "Failed communicating to running daemon", errors.TypeNetwork, errors.M("socket", control.Address())) + } + defer c.Disconnect() + err = c.Restart(context.Background()) + if err != nil { + return errors.New(err, "Failed trigger restart of daemon") + } + return nil + }, + } +} diff --git a/x-pack/elastic-agent/pkg/basecmd/version/cmd.go b/x-pack/elastic-agent/pkg/basecmd/version/cmd.go index 0bf25438e80..b4e602759cb 100644 --- a/x-pack/elastic-agent/pkg/basecmd/version/cmd.go +++ b/x-pack/elastic-agent/pkg/basecmd/version/cmd.go @@ -5,32 +5,95 @@ package version import ( + "context" "fmt" "github.com/spf13/cobra" + "gopkg.in/yaml.v2" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" ) +// Output returns the output when `--yaml` is used. +type Output struct { + Binary *release.VersionInfo `yaml:"binary"` + Daemon *release.VersionInfo `yaml:"daemon,omitempty"` +} + // NewCommandWithArgs returns a new version command. func NewCommandWithArgs(streams *cli.IOStreams) *cobra.Command { - return &cobra.Command{ + cmd := &cobra.Command{ Use: "version", Short: "Display the version of the elastic-agent.", - Run: func(_ *cobra.Command, _ []string) { - version := release.Version() - if release.Snapshot() { - version = version + "-SNAPSHOT" + Run: func(cmd *cobra.Command, _ []string) { + var daemon *release.VersionInfo + var daemonError error + + binary := release.Info() + binaryOnly, _ := cmd.Flags().GetBool("binary-only") + if !binaryOnly { + c := client.New() + daemonError = c.Connect(context.Background()) + if daemonError == nil { + defer c.Disconnect() + + var version client.Version + version, daemonError = c.Version(context.Background()) + if daemonError == nil { + daemon = &release.VersionInfo{ + Version: version.Version, + Commit: version.Commit, + BuildTime: version.BuildTime, + Snapshot: version.Snapshot, + } + } + } + } + if daemonError != nil { + fmt.Fprintf(streams.Err, "Failed talking to running daemon: %s\n", daemonError) + } + + outputYaml, _ := cmd.Flags().GetBool("yaml") + if outputYaml { + p := Output{ + Binary: &binary, + Daemon: daemon, + } + out, err := yaml.Marshal(p) + if err != nil { + fmt.Fprintf(streams.Err, "Failed to render YAML: %s\n", err) + } + fmt.Fprintf(streams.Out, "%s", out) + return } - fmt.Fprintf( - streams.Out, - "Agent version is %s (build: %s at %s)\n", - version, - release.Commit(), - release.BuildTime(), - ) + if !binaryOnly { + mismatch := false + str := "" + if daemon != nil { + str = daemon.String() + mismatch = isMismatch(&binary, daemon) + } + if mismatch { + fmt.Fprintf(streams.Err, "WARN: Then running daemon of Elastic Agent does not match this version.\n") + } + fmt.Fprintf(streams.Out, "Daemon: %s\n", str) + } + fmt.Fprintf(streams.Out, "Binary: %s\n", binary.String()) }, } + + cmd.Flags().Bool("binary-only", false, "Version of current binary only") + cmd.Flags().Bool("yaml", false, "Output information in YAML format") + + return cmd +} + +func isMismatch(a *release.VersionInfo, b *release.VersionInfo) bool { + if a.Commit != "unknown" && b.Commit != "unknown" { + return a.Commit != b.Commit + } + return a.Version != b.Version || a.BuildTime != b.BuildTime || a.Snapshot != b.Snapshot } diff --git a/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go b/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go index 111d174608f..119809338d6 100644 --- a/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go +++ b/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go @@ -10,17 +10,90 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" + "github.com/elastic/beats/v7/libbeat/logp" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/server" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" ) -func TestCmd(t *testing.T) { +func TestCmdBinaryOnly(t *testing.T) { + streams, _, out, _ := cli.NewTestingIOStreams() + cmd := NewCommandWithArgs(streams) + cmd.Flags().Set("binary-only", "true") + cmd.Execute() + version, err := ioutil.ReadAll(out) + + require.NoError(t, err) + assert.True(t, strings.Contains(string(version), "Binary: ")) + assert.False(t, strings.Contains(string(version), "Daemon: ")) +} + +func TestCmdBinaryOnlyYAML(t *testing.T) { + streams, _, out, _ := cli.NewTestingIOStreams() + cmd := NewCommandWithArgs(streams) + cmd.Flags().Set("binary-only", "true") + cmd.Flags().Set("yaml", "true") + cmd.Execute() + version, err := ioutil.ReadAll(out) + + require.NoError(t, err) + + var output Output + err = yaml.Unmarshal(version, &output) + require.NoError(t, err) + + assert.Nil(t, output.Daemon) + assert.Equal(t, release.Info(), *output.Binary) +} + +func TestCmdDaemon(t *testing.T) { + srv := server.New(newErrorLogger(t), nil) + require.NoError(t, srv.Start()) + defer srv.Stop() + + streams, _, out, _ := cli.NewTestingIOStreams() + cmd := NewCommandWithArgs(streams) + cmd.Execute() + version, err := ioutil.ReadAll(out) + + require.NoError(t, err) + assert.True(t, strings.Contains(string(version), "Binary: ")) + assert.True(t, strings.Contains(string(version), "Daemon: ")) +} + +func TestCmdDaemonYAML(t *testing.T) { + srv := server.New(newErrorLogger(t), nil) + require.NoError(t, srv.Start()) + defer srv.Stop() + streams, _, out, _ := cli.NewTestingIOStreams() - NewCommandWithArgs(streams).Execute() + cmd := NewCommandWithArgs(streams) + cmd.Flags().Set("yaml", "true") + cmd.Execute() version, err := ioutil.ReadAll(out) - if !assert.NoError(t, err) { - return - } - assert.True(t, strings.Contains(string(version), "Agent version is")) + require.NoError(t, err) + + var output Output + err = yaml.Unmarshal(version, &output) + require.NoError(t, err) + + assert.Equal(t, release.Info(), *output.Daemon) + assert.Equal(t, release.Info(), *output.Binary) +} + +func newErrorLogger(t *testing.T) *logger.Logger { + t.Helper() + + loggerCfg := logger.DefaultLoggingConfig() + loggerCfg.Level = logp.ErrorLevel + + log, err := logger.NewFromConfig("", loggerCfg) + require.NoError(t, err) + return log } diff --git a/x-pack/elastic-agent/pkg/release/version.go b/x-pack/elastic-agent/pkg/release/version.go index 7c139d943a9..542ea829417 100644 --- a/x-pack/elastic-agent/pkg/release/version.go +++ b/x-pack/elastic-agent/pkg/release/version.go @@ -6,6 +6,7 @@ package release import ( "strconv" + "strings" "time" libbeatVersion "github.com/elastic/beats/v7/libbeat/version" @@ -34,3 +35,37 @@ func Snapshot() bool { val, err := strconv.ParseBool(snapshot) return err == nil && val } + +// VersionInfo is structure used by `version --yaml`. +type VersionInfo struct { + Version string `yaml:"version"` + Commit string `yaml:"commit"` + BuildTime time.Time `yaml:"build_time"` + Snapshot bool `yaml:"snapshot"` +} + +// Info returns current version information. +func Info() VersionInfo { + return VersionInfo{ + Version: Version(), + Commit: Commit(), + BuildTime: BuildTime(), + Snapshot: Snapshot(), + } +} + +// String returns the string format for the version informaiton. +func (v *VersionInfo) String() string { + var sb strings.Builder + + sb.WriteString(v.Version) + if v.Snapshot { + sb.WriteString("-SNAPSHOT") + } + sb.WriteString(" (build: ") + sb.WriteString(v.Commit) + sb.WriteString(" at ") + sb.WriteString(v.BuildTime.Format("2006-01-02 15:04:05 -0700 MST")) + sb.WriteString(")") + return sb.String() +}