Skip to content

Commit

Permalink
[Elastic Agent] Send checkin payload to Fleet (#19857)
Browse files Browse the repository at this point in the history
* Add sending payload from status checking to Kibana.

* Run make notice.

* Fix fleet manager.
  • Loading branch information
blakerouse committed Jul 14, 2020
1 parent e17431e commit e16b6bd
Show file tree
Hide file tree
Showing 23 changed files with 137 additions and 82 deletions.
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5703,11 +5703,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/ecs@v1.5.0/LICE

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-client/v7
Version: v7.0.0-20200601155656-d6a9eb4f6d07
Version: v7.0.0-20200709172729-d43b7ad5833a
Licence type (autodetected): Elastic
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-client/v7@v7.0.0-20200601155656-d6a9eb4f6d07/LICENSE.txt:
Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-client/v7@v7.0.0-20200709172729-d43b7ad5833a/LICENSE.txt:

ELASTIC LICENSE AGREEMENT

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ require (
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4
github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2
github.com/elastic/ecs v1.5.0
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200601155656-d6a9eb4f6d07
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a
github.com/elastic/go-concert v0.0.3
github.com/elastic/go-libaudit/v2 v2.0.0-20200515221334-92371bef3fb8
github.com/elastic/go-licenser v0.3.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 h1:lnDkqiRFKm0rxdljqr
github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3/go.mod h1:aPqzac6AYkipvp4hufTyMj5PDIphF3+At8zr7r51xjY=
github.com/elastic/ecs v1.5.0 h1:/VEIBsRU4ecq2+U3RPfKNc6bFyomP6qnthYEcQZu8GU=
github.com/elastic/ecs v1.5.0/go.mod h1:pgiLbQsijLOJvFR8OTILLu0Ni/R/foUNg0L+T6mU9b4=
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200601155656-d6a9eb4f6d07 h1:s/41t2QLLkaa83VlS5UuyKH0ctX3bG4RMnE3Eha+8fU=
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200601155656-d6a9eb4f6d07/go.mod h1:uh/Gj9a0XEbYoM4NYz4LvaBVARz3QXLmlNjsrKY9fTc=
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a h1:2NHgf1RUw+f240lpTnLrCp1aBNvq2wDi0E1A423/S1k=
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a/go.mod h1:uh/Gj9a0XEbYoM4NYz4LvaBVARz3QXLmlNjsrKY9fTc=
github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270 h1:cWPqxlPtir4RoQVCpGSRXmLqjEHpJKbR60rxh1nQZY4=
github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270/go.mod h1:Msl1pdboCbArMF/nSCDUXgQuWTeoMmE/z8607X+k7ng=
github.com/elastic/go-concert v0.0.3 h1:f0F4WOi8tBOFIgwA7YbHRQ+Ok8vR+/qFrG7vYvbpX5Q=
Expand Down
8 changes: 4 additions & 4 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ func (*testMonitorableApp) Shutdown() {}
func (*testMonitorableApp) Configure(_ context.Context, config map[string]interface{}) error {
return nil
}
func (*testMonitorableApp) State() state.State { return state.State{} }
func (*testMonitorableApp) SetState(_ state.Status, _ string) {}
func (a *testMonitorableApp) Monitor() monitoring.Monitor { return a.monitor }
func (a *testMonitorableApp) OnStatusChange(_ *server.ApplicationState, _ proto.StateObserved_Status, _ string) {
func (*testMonitorableApp) State() state.State { return state.State{} }
func (*testMonitorableApp) SetState(_ state.Status, _ string, _ map[string]interface{}) {}
func (a *testMonitorableApp) Monitor() monitoring.Monitor { return a.monitor }
func (a *testMonitorableApp) OnStatusChange(_ *server.ApplicationState, _ proto.StateObserved_Status, _ string, _ map[string]interface{}) {
}

type testMonitor struct {
Expand Down
8 changes: 4 additions & 4 deletions x-pack/elastic-agent/pkg/agent/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type Application interface {
Configure(ctx context.Context, config map[string]interface{}) error
Monitor() monitoring.Monitor
State() state.State
SetState(status state.Status, msg string)
OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string)
SetState(status state.Status, msg string, payload map[string]interface{})
OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string, payload map[string]interface{})
}

// Descriptor defines a program which needs to be run.
Expand All @@ -68,10 +68,10 @@ type ApplicationStatusHandler struct{}
// OnStatusChange is the handler called by the GRPC server code.
//
// It updates the status of the application and handles restarting the application is needed.
func (*ApplicationStatusHandler) OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string) {
func (*ApplicationStatusHandler) OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string, payload map[string]interface{}) {
app, ok := s.App().(Application)
if !ok {
panic(errors.New("only Application can be registered when using the ApplicationStatusHandler", errors.TypeUnexpected))
}
app.OnStatusChange(s, status, msg)
app.OnStatusChange(s, status, msg, payload)
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (o *operationConfig) Check(_ context.Context, _ Application) (bool, error)
func (o *operationConfig) Run(ctx context.Context, application Application) (err error) {
defer func() {
if err != nil {
application.SetState(state.Failed, err.Error())
application.SetState(state.Failed, err.Error(), nil)
}
}()
return application.Configure(ctx, o.cfg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (o *operationFetch) Check(_ context.Context, _ Application) (bool, error) {
func (o *operationFetch) Run(ctx context.Context, application Application) (err error) {
defer func() {
if err != nil {
application.SetState(state.Failed, err.Error())
application.SetState(state.Failed, err.Error(), nil)
}
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (o *operationInstall) Check(ctx context.Context, _ Application) (bool, erro
func (o *operationInstall) Run(ctx context.Context, application Application) (err error) {
defer func() {
if err != nil {
application.SetState(state.Failed, err.Error())
application.SetState(state.Failed, err.Error(), nil)
}
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (o *operationRemove) Check(_ context.Context, _ Application) (bool, error)
func (o *operationRemove) Run(ctx context.Context, application Application) (err error) {
defer func() {
if err != nil {
application.SetState(state.Failed, err.Error())
application.SetState(state.Failed, err.Error(), nil)
}
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (o *operationStart) Check(_ context.Context, application Application) (bool
func (o *operationStart) Run(ctx context.Context, application Application) (err error) {
defer func() {
if err != nil {
application.SetState(state.Failed, err.Error())
application.SetState(state.Failed, err.Error(), nil)
}
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (o *operationUninstall) Check(_ context.Context, _ Application) (bool, erro
func (o *operationUninstall) Run(ctx context.Context, application Application) (err error) {
defer func() {
if err != nil {
application.SetState(state.Failed, err.Error())
application.SetState(state.Failed, err.Error(), nil)
}
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (o *operationVerify) Check(_ context.Context, _ Application) (bool, error)
func (o *operationVerify) Run(_ context.Context, application Application) (err error) {
defer func() {
if err != nil {
application.SetState(state.Failed, err.Error())
application.SetState(state.Failed, err.Error(), nil)
}
}()

Expand Down
11 changes: 11 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"runtime"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
Expand Down Expand Up @@ -137,6 +139,15 @@ func TestConfigurableFailed(t *testing.T) {
pid = item.ProcessInfo.PID
return nil
})
items := operator.State()
item, ok := items[p.ID()]
if !ok {
t.Fatalf("no state for process")
}
assert.Equal(t, map[string]interface{}{
"status": float64(proto.StateObserved_HEALTHY),
"message": "Running",
}, item.Payload)

// try to configure (with failed status)
cfg := make(map[string]interface{})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,28 @@ type configServer struct {
f *os.File
ctx context.Context
cancel context.CancelFunc
client *client.Client
client client.Client
}

func (s *configServer) OnConfig(cfgString string) {
s.client.Status(proto.StateObserved_CONFIGURING, "Writing config file")
s.client.Status(proto.StateObserved_CONFIGURING, "Writing config file", nil)

testCfg := &TestConfig{}
if err := yaml.Unmarshal([]byte(cfgString), &testCfg); err != nil {
s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to unmarshall config: %s", err))
s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to unmarshall config: %s", err), nil)
return
}

if testCfg.TestFile != "" {
tf, err := os.Create(testCfg.TestFile)
if err != nil {
s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to create file %s: %s", testCfg.TestFile, err))
s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to create file %s: %s", testCfg.TestFile, err), nil)
return
}

err = tf.Close()
if err != nil {
s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to close file %s: %s", testCfg.TestFile, err))
s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to close file %s: %s", testCfg.TestFile, err), nil)
return
}
}
Expand All @@ -75,14 +75,20 @@ func (s *configServer) OnConfig(cfgString string) {
}

if testCfg.Status != nil {
s.client.Status(*testCfg.Status, "Custom status")
s.client.Status(*testCfg.Status, "Custom status", map[string]interface{}{
"status": *testCfg.Status,
"message": "Custom status",
})
} else {
s.client.Status(proto.StateObserved_HEALTHY, "Running")
s.client.Status(proto.StateObserved_HEALTHY, "Running", map[string]interface{}{
"status": proto.StateObserved_HEALTHY,
"message": "Running",
})
}
}

func (s *configServer) OnStop() {
s.client.Status(proto.StateObserved_STOPPING, "Stopping")
s.client.Status(proto.StateObserved_STOPPING, "Stopping", nil)
s.cancel()
}

Expand Down
18 changes: 10 additions & 8 deletions x-pack/elastic-agent/pkg/core/plugin/process/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"fmt"
"os"
"reflect"
"sync"
"time"

Expand Down Expand Up @@ -146,7 +147,7 @@ func (a *Application) Stop() {
// cleanup drops
a.cleanUp()
}
a.setState(state.Stopped, "Stopped")
a.setState(state.Stopped, "Stopped", nil)
}

// Shutdown stops the application (aka. subprocess).
Expand All @@ -156,10 +157,10 @@ func (a *Application) Shutdown() {
}

// SetState sets the status of the application.
func (a *Application) SetState(status state.Status, msg string) {
func (a *Application) SetState(status state.Status, msg string, payload map[string]interface{}) {
a.appLock.Lock()
defer a.appLock.Unlock()
a.setState(status, msg)
a.setState(status, msg, payload)
}

func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.Info, cfg map[string]interface{}) {
Expand Down Expand Up @@ -189,7 +190,7 @@ func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.I
}

msg := fmt.Sprintf("exited with code: %d", procState.ExitCode())
a.setState(state.Crashed, msg)
a.setState(state.Crashed, msg, nil)

// it was a crash, cleanup anything required
go a.cleanUp()
Expand All @@ -214,7 +215,7 @@ func (a *Application) waitProc(proc *os.Process) <-chan *os.ProcessState {
return resChan
}

func (a *Application) setStateFromProto(pstatus proto.StateObserved_Status, msg string) {
func (a *Application) setStateFromProto(pstatus proto.StateObserved_Status, msg string, payload map[string]interface{}) {
var status state.Status
switch pstatus {
case proto.StateObserved_STARTING:
Expand All @@ -230,13 +231,14 @@ func (a *Application) setStateFromProto(pstatus proto.StateObserved_Status, msg
case proto.StateObserved_STOPPING:
status = state.Stopping
}
a.setState(status, msg)
a.setState(status, msg, payload)
}

func (a *Application) setState(status state.Status, msg string) {
if a.state.Status != status || a.state.Message != msg {
func (a *Application) setState(status state.Status, msg string, payload map[string]interface{}) {
if a.state.Status != status || a.state.Message != msg || !reflect.DeepEqual(a.state.Payload, payload) {
a.state.Status = status
a.state.Message = msg
a.state.Payload = payload
if a.reporter != nil {
go a.reporter.OnStateChange(a.id, a.name, a.state)
}
Expand Down
8 changes: 4 additions & 4 deletions x-pack/elastic-agent/pkg/core/plugin/process/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]

// Failed applications can be started again.
if srvState != nil {
a.setState(state.Starting, "Starting")
srvState.SetStatus(proto.StateObserved_STARTING, a.state.Message)
a.setState(state.Starting, "Starting", nil)
srvState.SetStatus(proto.StateObserved_STARTING, a.state.Message, a.state.Payload)
srvState.UpdateConfig(string(cfgStr))
} else {
a.srvState, err = a.srv.Register(a, string(cfgStr))
Expand All @@ -66,9 +66,9 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]

if a.state.Status != state.Stopped {
// restarting as it was previously in a different state
a.setState(state.Restarting, "Restarting")
a.setState(state.Restarting, "Restarting", nil)
} else {
a.setState(state.Starting, "Starting")
a.setState(state.Starting, "Starting", nil)
}

defer func() {
Expand Down
6 changes: 3 additions & 3 deletions x-pack/elastic-agent/pkg/core/plugin/process/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
// OnStatusChange is the handler called by the GRPC server code.
//
// It updates the status of the application and handles restarting the application if needed.
func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string) {
func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string, payload map[string]interface{}) {
a.appLock.Lock()
defer a.appLock.Unlock()

Expand All @@ -28,7 +28,7 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St
return
}

a.setStateFromProto(status, msg)
a.setStateFromProto(status, msg, payload)
if status == proto.StateObserved_FAILED {
// ignore when expected state is stopping
if s.Expected() == proto.StateExpected_STOPPING {
Expand All @@ -52,7 +52,7 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St

err := a.start(ctx, tag, cfg)
if err != nil {
a.setState(state.Crashed, fmt.Sprintf("failed to restart: %s", err))
a.setState(state.Crashed, fmt.Sprintf("failed to restart: %s", err), nil)
}
}
}
Loading

0 comments on commit e16b6bd

Please sign in to comment.