Skip to content

Commit

Permalink
[Elastic Agent] Fix missing elastic_agent event data (elastic#21994)
Browse files Browse the repository at this point in the history
* Fix fields.

* Remove from monitoring decorator.

* Add changelog.

* Fix tests.

* Fix tests.

* Fix import.

(cherry picked from commit 610e998)
  • Loading branch information
blakerouse committed Oct 20, 2020
1 parent 92d6be3 commit 578fac3
Show file tree
Hide file tree
Showing 15 changed files with 89 additions and 48 deletions.
2 changes: 2 additions & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
- Fix issue where inputs without processors defined would panic {pull}21628[21628]
- Partial extracted beat result in failure to spawn beat {issue}21718[21718]
- Rename monitoring index from `elastic.agent` to `elastic_agent` {pull}21932[21932]
- Fix issue with named pipes on Windows 7 {pull}21931[21931]
- Fix missing elastic_agent event data {pull}21994[21994]

==== New features

Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func newLocal(
return nil, errors.New(err, "failed to initialize monitoring")
}

router, err := newRouter(log, streamFactory(localApplication.bgContext, cfg.Settings, localApplication.srv, reporter, monitor))
router, err := newRouter(log, streamFactory(localApplication.bgContext, agentInfo, cfg.Settings, localApplication.srv, reporter, monitor))
if err != nil {
return nil, errors.New(err, "fail to initialize pipeline router")
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func newManaged(
return nil, errors.New(err, "failed to initialize monitoring")
}

router, err := newRouter(log, streamFactory(managedApplication.bgContext, cfg.Settings, managedApplication.srv, combinedReporter, monitor))
router, err := newRouter(log, streamFactory(managedApplication.bgContext, agentInfo, cfg.Settings, managedApplication.srv, combinedReporter, monitor))
if err != nil {
return nil, errors.New(err, "fail to initialize pipeline router")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func getMonitoringRule(outputName string) *transpiler.RuleList {
return transpiler.NewRuleList(
transpiler.Copy(monitoringOutputSelector, outputKey),
transpiler.Rename(fmt.Sprintf("%s.%s", outputsKey, outputName), elasticsearchKey),
transpiler.InjectAgentInfo(),
transpiler.Filter(monitoringKey, programsKey, outputKey),
)
}
8 changes: 5 additions & 3 deletions x-pack/elastic-agent/pkg/agent/application/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package application
import (
"context"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
Expand Down Expand Up @@ -40,10 +41,10 @@ func (b *operatorStream) Shutdown() {
b.configHandler.Shutdown()
}

func streamFactory(ctx context.Context, cfg *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) {
func streamFactory(ctx context.Context, agentInfo *info.AgentInfo, cfg *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) {
return func(log *logger.Logger, id routingKey) (stream, error) {
// new operator per stream to isolate processes without using tags
operator, err := newOperator(ctx, log, id, cfg, srv, r, m)
operator, err := newOperator(ctx, log, agentInfo, id, cfg, srv, r, m)
if err != nil {
return nil, err
}
Expand All @@ -55,7 +56,7 @@ func streamFactory(ctx context.Context, cfg *configuration.SettingsConfig, srv *
}
}

func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor) (*operation.Operator, error) {
func newOperator(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, id routingKey, config *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor) (*operation.Operator, error) {
fetcher := downloader.NewDownloader(log, config.DownloadConfig)
allowEmptyPgp, pgp := release.PGP()
verifier, err := downloader.NewVerifier(log, config.DownloadConfig, allowEmptyPgp, pgp)
Expand All @@ -81,6 +82,7 @@ func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config
return operation.NewOperator(
ctx,
log,
agentInfo,
id,
config,
fetcher,
Expand Down
4 changes: 3 additions & 1 deletion x-pack/elastic-agent/pkg/agent/operation/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/stateresolver"
Expand Down Expand Up @@ -48,6 +49,7 @@ func getTestOperator(t *testing.T, downloadPath string, installPath string, p *a
}

l := getLogger()
agentInfo, _ := info.NewAgentInfo()

fetcher := &DummyDownloader{}
verifier := &DummyVerifier{}
Expand All @@ -67,7 +69,7 @@ func getTestOperator(t *testing.T, downloadPath string, installPath string, p *a
t.Fatal(err)
}

operator, err := NewOperator(context.Background(), l, "p1", operatorCfg, fetcher, verifier, installer, uninstaller, stateResolver, srv, nil, noop.NewMonitor())
operator, err := NewOperator(context.Background(), l, agentInfo, "p1", operatorCfg, fetcher, verifier, installer, uninstaller, stateResolver, srv, nil, noop.NewMonitor())
if err != nil {
t.Fatal(err)
}
Expand Down
30 changes: 30 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,16 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i
},
},
},
{
"add_fields": map[string]interface{}{
"target": "elastic_agent",
"fields": map[string]interface{}{
"id": o.agentInfo.AgentID(),
"version": o.agentInfo.Version(),
"snapshot": o.agentInfo.Snapshot(),
},
},
},
},
},
}
Expand Down Expand Up @@ -240,6 +250,16 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i
},
},
},
{
"add_fields": map[string]interface{}{
"target": "elastic_agent",
"fields": map[string]interface{}{
"id": o.agentInfo.AgentID(),
"version": o.agentInfo.Version(),
"snapshot": o.agentInfo.Snapshot(),
},
},
},
},
})
}
Expand Down Expand Up @@ -290,6 +310,16 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string
},
},
},
{
"add_fields": map[string]interface{}{
"target": "elastic_agent",
"fields": map[string]interface{}{
"id": o.agentInfo.AgentID(),
"version": o.agentInfo.Version(),
"snapshot": o.agentInfo.Snapshot(),
},
},
},
},
})
}
Expand Down
4 changes: 3 additions & 1 deletion x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/stateresolver"
Expand Down Expand Up @@ -112,6 +113,7 @@ func getMonitorableTestOperator(t *testing.T, installPath string, m monitoring.M
}

l := getLogger()
agentInfo, _ := info.NewAgentInfo()

fetcher := &DummyDownloader{}
verifier := &DummyVerifier{}
Expand All @@ -128,7 +130,7 @@ func getMonitorableTestOperator(t *testing.T, installPath string, m monitoring.M
}

ctx := context.Background()
operator, err := NewOperator(ctx, l, "p1", cfg, fetcher, verifier, installer, uninstaller, stateResolver, srv, nil, m)
operator, err := NewOperator(ctx, l, agentInfo, "p1", cfg, fetcher, verifier, installer, uninstaller, stateResolver, srv, nil, m)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 4 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
Expand Down Expand Up @@ -43,6 +44,7 @@ type Operator struct {
bgContext context.Context
pipelineID string
logger *logger.Logger
agentInfo *info.AgentInfo
config *configuration.SettingsConfig
handlers map[string]handleFunc
stateResolver *stateresolver.StateResolver
Expand All @@ -66,6 +68,7 @@ type Operator struct {
func NewOperator(
ctx context.Context,
logger *logger.Logger,
agentInfo *info.AgentInfo,
pipelineID string,
config *configuration.SettingsConfig,
fetcher download.Downloader,
Expand All @@ -85,6 +88,7 @@ func NewOperator(
config: config,
pipelineID: pipelineID,
logger: logger,
agentInfo: agentInfo,
downloader: fetcher,
verifier: verifier,
installer: installer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ filebeat:
fields:
dataset: generic
- add_fields:
target: "elastic"
target: "elastic_agent"
fields:
agent.id: agent-id
agent.version: 8.0.0
agent.snapshot: false
id: agent-id
version: 8.0.0
snapshot: false
output:
elasticsearch:
enabled: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ filebeat:
fields:
dataset: generic
- add_fields:
target: "elastic"
target: "elastic_agent"
fields:
agent.id: agent-id
agent.version: 8.0.0
agent.snapshot: false
id: agent-id
version: 8.0.0
snapshot: false
output:
elasticsearch:
hosts:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ filebeat:
fields:
dataset: generic
- add_fields:
target: "elastic"
target: "elastic_agent"
fields:
agent.id: agent-id
agent.version: 8.0.0
agent.snapshot: false
id: agent-id
version: 8.0.0
snapshot: false
- type: log
paths:
- /var/log/hello3.log
Expand All @@ -43,11 +43,11 @@ filebeat:
fields:
dataset: generic
- add_fields:
target: "elastic"
target: "elastic_agent"
fields:
agent.id: agent-id
agent.version: 8.0.0
agent.snapshot: false
id: agent-id
version: 8.0.0
snapshot: false
output:
elasticsearch:
hosts:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ metricbeat:
fields:
dataset: docker.status
- add_fields:
target: "elastic"
target: "elastic_agent"
fields:
agent.id: agent-id
agent.version: 8.0.0
agent.snapshot: false
id: agent-id
version: 8.0.0
snapshot: false
- module: docker
metricsets: [info]
index: metrics-generic-default
Expand All @@ -37,11 +37,11 @@ metricbeat:
fields:
dataset: generic
- add_fields:
target: "elastic"
target: "elastic_agent"
fields:
agent.id: agent-id
agent.version: 8.0.0
agent.snapshot: false
id: agent-id
version: 8.0.0
snapshot: false
- module: apache
metricsets: [info]
index: metrics-generic-testing
Expand All @@ -61,11 +61,11 @@ metricbeat:
fields:
dataset: generic
- add_fields:
target: "elastic"
target: "elastic_agent"
fields:
agent.id: agent-id
agent.version: 8.0.0
agent.snapshot: false
id: agent-id
version: 8.0.0
snapshot: false
output:
elasticsearch:
hosts: [127.0.0.1:9200, 127.0.0.1:9300]
Expand Down
8 changes: 4 additions & 4 deletions x-pack/elastic-agent/pkg/agent/transpiler/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,11 +715,11 @@ func (r *InjectAgentInfoRule) Apply(agentInfo AgentInfo, ast *AST) error {

// elastic.agent
processorMap := &Dict{value: make([]Node, 0)}
processorMap.value = append(processorMap.value, &Key{name: "target", value: &StrVal{value: "elastic"}})
processorMap.value = append(processorMap.value, &Key{name: "target", value: &StrVal{value: "elastic_agent"}})
processorMap.value = append(processorMap.value, &Key{name: "fields", value: &Dict{value: []Node{
&Key{name: "agent.id", value: &StrVal{value: agentInfo.AgentID()}},
&Key{name: "agent.version", value: &StrVal{value: agentInfo.Version()}},
&Key{name: "agent.snapshot", value: &BoolVal{value: agentInfo.Snapshot()}},
&Key{name: "id", value: &StrVal{value: agentInfo.AgentID()}},
&Key{name: "version", value: &StrVal{value: agentInfo.Version()}},
&Key{name: "snapshot", value: &BoolVal{value: agentInfo.Snapshot()}},
}}})
addFieldsMap := &Dict{value: []Node{&Key{"add_fields", processorMap}}}
processorsList.value = mergeStrategy("").InjectItem(processorsList.value, addFieldsMap)
Expand Down
16 changes: 8 additions & 8 deletions x-pack/elastic-agent/pkg/agent/transpiler/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ inputs:
type: file
processors:
- add_fields:
target: elastic
target: elastic_agent
fields:
agent.id: agent-id
agent.snapshot: false
agent.version: 8.0.0
id: agent-id
snapshot: false
version: 8.0.0
- name: With processors
type: file
processors:
Expand All @@ -197,11 +197,11 @@ inputs:
fields:
data: more
- add_fields:
target: elastic
target: elastic_agent
fields:
agent.id: agent-id
agent.snapshot: false
agent.version: 8.0.0
id: agent-id
snapshot: false
version: 8.0.0
`,
rule: &RuleList{
Rules: []Rule{
Expand Down

0 comments on commit 578fac3

Please sign in to comment.