Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Elastic Agent] Send checkin payload to Fleet #19857

Merged
merged 3 commits into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5703,11 +5703,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/ecs@v1.5.0/LICE

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-client/v7
Version: v7.0.0-20200601155656-d6a9eb4f6d07
Version: v7.0.0-20200709172729-d43b7ad5833a
Licence type (autodetected): Elastic
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-client/v7@v7.0.0-20200601155656-d6a9eb4f6d07/LICENSE.txt:
Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-client/v7@v7.0.0-20200709172729-d43b7ad5833a/LICENSE.txt:

ELASTIC LICENSE AGREEMENT

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ require (
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4
github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2
github.com/elastic/ecs v1.5.0
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200601155656-d6a9eb4f6d07
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a
github.com/elastic/go-concert v0.0.3
github.com/elastic/go-libaudit/v2 v2.0.0-20200515221334-92371bef3fb8
github.com/elastic/go-licenser v0.3.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 h1:lnDkqiRFKm0rxdljqr
github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3/go.mod h1:aPqzac6AYkipvp4hufTyMj5PDIphF3+At8zr7r51xjY=
github.com/elastic/ecs v1.5.0 h1:/VEIBsRU4ecq2+U3RPfKNc6bFyomP6qnthYEcQZu8GU=
github.com/elastic/ecs v1.5.0/go.mod h1:pgiLbQsijLOJvFR8OTILLu0Ni/R/foUNg0L+T6mU9b4=
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200601155656-d6a9eb4f6d07 h1:s/41t2QLLkaa83VlS5UuyKH0ctX3bG4RMnE3Eha+8fU=
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200601155656-d6a9eb4f6d07/go.mod h1:uh/Gj9a0XEbYoM4NYz4LvaBVARz3QXLmlNjsrKY9fTc=
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a h1:2NHgf1RUw+f240lpTnLrCp1aBNvq2wDi0E1A423/S1k=
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a/go.mod h1:uh/Gj9a0XEbYoM4NYz4LvaBVARz3QXLmlNjsrKY9fTc=
github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270 h1:cWPqxlPtir4RoQVCpGSRXmLqjEHpJKbR60rxh1nQZY4=
github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270/go.mod h1:Msl1pdboCbArMF/nSCDUXgQuWTeoMmE/z8607X+k7ng=
github.com/elastic/go-concert v0.0.3 h1:f0F4WOi8tBOFIgwA7YbHRQ+Ok8vR+/qFrG7vYvbpX5Q=
Expand Down
8 changes: 4 additions & 4 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ func (*testMonitorableApp) Shutdown() {}
func (*testMonitorableApp) Configure(_ context.Context, config map[string]interface{}) error {
return nil
}
func (*testMonitorableApp) State() state.State { return state.State{} }
func (*testMonitorableApp) SetState(_ state.Status, _ string) {}
func (a *testMonitorableApp) Monitor() monitoring.Monitor { return a.monitor }
func (a *testMonitorableApp) OnStatusChange(_ *server.ApplicationState, _ proto.StateObserved_Status, _ string) {
func (*testMonitorableApp) State() state.State { return state.State{} }
func (*testMonitorableApp) SetState(_ state.Status, _ string, _ map[string]interface{}) {}
func (a *testMonitorableApp) Monitor() monitoring.Monitor { return a.monitor }
func (a *testMonitorableApp) OnStatusChange(_ *server.ApplicationState, _ proto.StateObserved_Status, _ string, _ map[string]interface{}) {
}

type testMonitor struct {
Expand Down
8 changes: 4 additions & 4 deletions x-pack/elastic-agent/pkg/agent/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type Application interface {
Configure(ctx context.Context, config map[string]interface{}) error
Monitor() monitoring.Monitor
State() state.State
SetState(status state.Status, msg string)
OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string)
SetState(status state.Status, msg string, payload map[string]interface{})
OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string, payload map[string]interface{})
}

// Descriptor defines a program which needs to be run.
Expand All @@ -68,10 +68,10 @@ type ApplicationStatusHandler struct{}
// OnStatusChange is the handler called by the GRPC server code.
//
// It updates the status of the application and handles restarting the application is needed.
func (*ApplicationStatusHandler) OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string) {
func (*ApplicationStatusHandler) OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string, payload map[string]interface{}) {
app, ok := s.App().(Application)
if !ok {
panic(errors.New("only Application can be registered when using the ApplicationStatusHandler", errors.TypeUnexpected))
}
app.OnStatusChange(s, status, msg)
app.OnStatusChange(s, status, msg, payload)
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (o *operationConfig) Check(_ context.Context, _ Application) (bool, error)
func (o *operationConfig) Run(ctx context.Context, application Application) (err error) {
defer func() {
if err != nil {
application.SetState(state.Failed, err.Error())
application.SetState(state.Failed, err.Error(), nil)
}
}()
return application.Configure(ctx, o.cfg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (o *operationFetch) Check(_ context.Context, _ Application) (bool, error) {
func (o *operationFetch) Run(ctx context.Context, application Application) (err error) {
defer func() {
if err != nil {
application.SetState(state.Failed, err.Error())
application.SetState(state.Failed, err.Error(), nil)
}
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (o *operationInstall) Check(ctx context.Context, _ Application) (bool, erro
func (o *operationInstall) Run(ctx context.Context, application Application) (err error) {
defer func() {
if err != nil {
application.SetState(state.Failed, err.Error())
application.SetState(state.Failed, err.Error(), nil)
}
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (o *operationRemove) Check(_ context.Context, _ Application) (bool, error)
func (o *operationRemove) Run(ctx context.Context, application Application) (err error) {
defer func() {
if err != nil {
application.SetState(state.Failed, err.Error())
application.SetState(state.Failed, err.Error(), nil)
}
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (o *operationStart) Check(_ context.Context, application Application) (bool
func (o *operationStart) Run(ctx context.Context, application Application) (err error) {
defer func() {
if err != nil {
application.SetState(state.Failed, err.Error())
application.SetState(state.Failed, err.Error(), nil)
}
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (o *operationUninstall) Check(_ context.Context, _ Application) (bool, erro
func (o *operationUninstall) Run(ctx context.Context, application Application) (err error) {
defer func() {
if err != nil {
application.SetState(state.Failed, err.Error())
application.SetState(state.Failed, err.Error(), nil)
}
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (o *operationVerify) Check(_ context.Context, _ Application) (bool, error)
func (o *operationVerify) Run(_ context.Context, application Application) (err error) {
defer func() {
if err != nil {
application.SetState(state.Failed, err.Error())
application.SetState(state.Failed, err.Error(), nil)
}
}()

Expand Down
11 changes: 11 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"runtime"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
Expand Down Expand Up @@ -137,6 +139,15 @@ func TestConfigurableFailed(t *testing.T) {
pid = item.ProcessInfo.PID
return nil
})
items := operator.State()
item, ok := items[p.ID()]
if !ok {
t.Fatalf("no state for process")
}
assert.Equal(t, map[string]interface{}{
"status": float64(proto.StateObserved_HEALTHY),
"message": "Running",
}, item.Payload)

// try to configure (with failed status)
cfg := make(map[string]interface{})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,28 @@ type configServer struct {
f *os.File
ctx context.Context
cancel context.CancelFunc
client *client.Client
client client.Client
}

func (s *configServer) OnConfig(cfgString string) {
s.client.Status(proto.StateObserved_CONFIGURING, "Writing config file")
s.client.Status(proto.StateObserved_CONFIGURING, "Writing config file", nil)

testCfg := &TestConfig{}
if err := yaml.Unmarshal([]byte(cfgString), &testCfg); err != nil {
s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to unmarshall config: %s", err))
s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to unmarshall config: %s", err), nil)
return
}

if testCfg.TestFile != "" {
tf, err := os.Create(testCfg.TestFile)
if err != nil {
s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to create file %s: %s", testCfg.TestFile, err))
s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to create file %s: %s", testCfg.TestFile, err), nil)
return
}

err = tf.Close()
if err != nil {
s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to close file %s: %s", testCfg.TestFile, err))
s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to close file %s: %s", testCfg.TestFile, err), nil)
return
}
}
Expand All @@ -75,14 +75,20 @@ func (s *configServer) OnConfig(cfgString string) {
}

if testCfg.Status != nil {
s.client.Status(*testCfg.Status, "Custom status")
s.client.Status(*testCfg.Status, "Custom status", map[string]interface{}{
"status": *testCfg.Status,
"message": "Custom status",
})
} else {
s.client.Status(proto.StateObserved_HEALTHY, "Running")
s.client.Status(proto.StateObserved_HEALTHY, "Running", map[string]interface{}{
"status": proto.StateObserved_HEALTHY,
"message": "Running",
})
}
}

func (s *configServer) OnStop() {
s.client.Status(proto.StateObserved_STOPPING, "Stopping")
s.client.Status(proto.StateObserved_STOPPING, "Stopping", nil)
s.cancel()
}

Expand Down
18 changes: 10 additions & 8 deletions x-pack/elastic-agent/pkg/core/plugin/process/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"fmt"
"os"
"reflect"
"sync"
"time"

Expand Down Expand Up @@ -146,7 +147,7 @@ func (a *Application) Stop() {
// cleanup drops
a.cleanUp()
}
a.setState(state.Stopped, "Stopped")
a.setState(state.Stopped, "Stopped", nil)
}

// Shutdown stops the application (aka. subprocess).
Expand All @@ -156,10 +157,10 @@ func (a *Application) Shutdown() {
}

// SetState sets the status of the application.
func (a *Application) SetState(status state.Status, msg string) {
func (a *Application) SetState(status state.Status, msg string, payload map[string]interface{}) {
a.appLock.Lock()
defer a.appLock.Unlock()
a.setState(status, msg)
a.setState(status, msg, payload)
}

func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.Info, cfg map[string]interface{}) {
Expand Down Expand Up @@ -189,7 +190,7 @@ func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.I
}

msg := fmt.Sprintf("exited with code: %d", procState.ExitCode())
a.setState(state.Crashed, msg)
a.setState(state.Crashed, msg, nil)

// it was a crash, cleanup anything required
go a.cleanUp()
Expand All @@ -214,7 +215,7 @@ func (a *Application) waitProc(proc *os.Process) <-chan *os.ProcessState {
return resChan
}

func (a *Application) setStateFromProto(pstatus proto.StateObserved_Status, msg string) {
func (a *Application) setStateFromProto(pstatus proto.StateObserved_Status, msg string, payload map[string]interface{}) {
var status state.Status
switch pstatus {
case proto.StateObserved_STARTING:
Expand All @@ -230,13 +231,14 @@ func (a *Application) setStateFromProto(pstatus proto.StateObserved_Status, msg
case proto.StateObserved_STOPPING:
status = state.Stopping
}
a.setState(status, msg)
a.setState(status, msg, payload)
}

func (a *Application) setState(status state.Status, msg string) {
if a.state.Status != status || a.state.Message != msg {
func (a *Application) setState(status state.Status, msg string, payload map[string]interface{}) {
if a.state.Status != status || a.state.Message != msg || !reflect.DeepEqual(a.state.Payload, payload) {
a.state.Status = status
a.state.Message = msg
a.state.Payload = payload
if a.reporter != nil {
go a.reporter.OnStateChange(a.id, a.name, a.state)
}
Expand Down
8 changes: 4 additions & 4 deletions x-pack/elastic-agent/pkg/core/plugin/process/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]

// Failed applications can be started again.
if srvState != nil {
a.setState(state.Starting, "Starting")
srvState.SetStatus(proto.StateObserved_STARTING, a.state.Message)
a.setState(state.Starting, "Starting", nil)
srvState.SetStatus(proto.StateObserved_STARTING, a.state.Message, a.state.Payload)
srvState.UpdateConfig(string(cfgStr))
} else {
a.srvState, err = a.srv.Register(a, string(cfgStr))
Expand All @@ -66,9 +66,9 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]

if a.state.Status != state.Stopped {
// restarting as it was previously in a different state
a.setState(state.Restarting, "Restarting")
a.setState(state.Restarting, "Restarting", nil)
} else {
a.setState(state.Starting, "Starting")
a.setState(state.Starting, "Starting", nil)
}

defer func() {
Expand Down
6 changes: 3 additions & 3 deletions x-pack/elastic-agent/pkg/core/plugin/process/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
// OnStatusChange is the handler called by the GRPC server code.
//
// It updates the status of the application and handles restarting the application if needed.
func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string) {
func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string, payload map[string]interface{}) {
a.appLock.Lock()
defer a.appLock.Unlock()

Expand All @@ -28,7 +28,7 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St
return
}

a.setStateFromProto(status, msg)
a.setStateFromProto(status, msg, payload)
if status == proto.StateObserved_FAILED {
// ignore when expected state is stopping
if s.Expected() == proto.StateExpected_STOPPING {
Expand All @@ -52,7 +52,7 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St

err := a.start(ctx, tag, cfg)
if err != nil {
a.setState(state.Crashed, fmt.Sprintf("failed to restart: %s", err))
a.setState(state.Crashed, fmt.Sprintf("failed to restart: %s", err), nil)
}
}
}
Loading