Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add add_docker_metadata processor #4352

Merged
merged 3 commits into from
May 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d
- Add a variable to the SysV init scripts to make it easier to change the user. {pull}4340[4340]
- Add the option to write the generated Elasticsearch mapping template into a file. {pull}4323[4323]
- Add instance_name in GCE add_cloud_metadata processor. {pull}4414[4414]
- Add `add_docker_metadata` processor. {pull}4352[4352]

*Filebeat*
- Add experimental Redis slow log prospector type. {pull}4180[4180]
Expand Down
1,013 changes: 901 additions & 112 deletions NOTICE

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,19 @@ filebeat.prospectors:
#- add_locale:
# format: offset
#
# The following example enriches each event with docker metadata, it matches
# given fields to an existing container id and adds info from that container:
#
#processors:
#- add_docker_metadata:
# match_fields: ["system.process.cgroup.id"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was first confused by this and had to read the code to understand what it does. It takes this field and checks if the value of this field matches to any container id. If yes, it will add the container meta data to the event.

In the code, it can currently only match 1 field I think, but here it is plural and you use an array. How is this going to work exactly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW I'm not too sure about the example used here. In case someone uses cgroups, doesn't that kind of mean he doesn't want to use docker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It checks all the fields until one of them matches

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for the example, you may want to have docker info on top of the cgroup id, so you can filter by, for instance, docker image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the config option is kind of field_that_matches_container_id: .... What is the use case of having multiple fields defined?

For the cgroup docker part: I see that this is what it's doing, but why would someone then not just use the docker module instead as he will rely on the docker api.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have several metrics matching cgroup id in system (maybe something we have to look into): system.process.cgroup.cpu.id, system.process.cgroup.memory.id, system.process.cgroup.blkio.id

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I assume per config you only want to match to one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to match all of them, perhaps in the future it would be better to merge those fields into a global system.cgroup.id, with different meanings depending on the metricset, but as for now I would need to match any possible field there.

what do you think @andrewkroh, does this make any sense?

# host: "unix:///var/run/docker.sock"
# # To connect to Docker over TLS you must specify a client and CA certificate.
# #ssl:
# # certificate_authority: "/etc/pki/root/ca.pem"
# # certificate: "/etc/pki/client/cert.pem"
# # key: "/etc/pki/client/cert.key"
#

#================================ Outputs ======================================

Expand Down
13 changes: 13 additions & 0 deletions heartbeat/heartbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,19 @@ heartbeat.scheduler:
#- add_locale:
# format: offset
#
# The following example enriches each event with docker metadata, it matches
# given fields to an existing container id and adds info from that container:
#
#processors:
#- add_docker_metadata:
# match_fields: ["system.process.cgroup.id"]
# host: "unix:///var/run/docker.sock"
# # To connect to Docker over TLS you must specify a client and CA certificate.
# #ssl:
# # certificate_authority: "/etc/pki/root/ca.pem"
# # certificate: "/etc/pki/client/cert.pem"
# # key: "/etc/pki/client/cert.key"
#

#================================ Outputs ======================================

Expand Down
13 changes: 13 additions & 0 deletions libbeat/_meta/config.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@
#- add_locale:
# format: offset
#
# The following example enriches each event with docker metadata, it matches
# given fields to an existing container id and adds info from that container:
#
#processors:
#- add_docker_metadata:
# match_fields: ["system.process.cgroup.id"]
# host: "unix:///var/run/docker.sock"
# # To connect to Docker over TLS you must specify a client and CA certificate.
# #ssl:
# # certificate_authority: "/etc/pki/root/ca.pem"
# # certificate: "/etc/pki/client/cert.pem"
# # key: "/etc/pki/client/cert.key"
#

#================================ Outputs ======================================

Expand Down
1 change: 1 addition & 0 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
// Register default processors.
_ "github.com/elastic/beats/libbeat/processors/actions"
_ "github.com/elastic/beats/libbeat/processors/add_cloud_metadata"
_ "github.com/elastic/beats/libbeat/processors/add_docker_metadata"
_ "github.com/elastic/beats/libbeat/processors/add_locale"
_ "github.com/elastic/beats/libbeat/processors/kubernetes"

Expand Down
96 changes: 96 additions & 0 deletions libbeat/processors/add_docker_metadata/add_docker_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package add_docker_metadata

import (
"fmt"
"strings"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
)

func init() {
processors.RegisterPlugin("add_docker_metadata", newDockerMetadataProcessor)
}

type addDockerMetadata struct {
watcher Watcher
fields []string
}

func newDockerMetadataProcessor(cfg common.Config) (processors.Processor, error) {
return buildDockerMetadataProcessor(cfg, NewWatcher)
}

func buildDockerMetadataProcessor(cfg common.Config, watcherConstructor WatcherConstructor) (processors.Processor, error) {
logp.Beta("The add_docker_metadata processor is beta")

config := defaultConfig()

err := cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("fail to unpack the add_docker_metadata configuration: %s", err)
}

watcher, err := watcherConstructor(config.Host, config.TLS)
if err != nil {
return nil, err
}

if err = watcher.Start(); err != nil {
return nil, err
}

return &addDockerMetadata{
watcher: watcher,
fields: config.Fields,
}, nil
}

func (d *addDockerMetadata) Run(event common.MapStr) (common.MapStr, error) {
var cid string
for _, field := range d.fields {
value, err := event.GetValue(field)
if err != nil {
continue
}

if strValue, ok := value.(string); ok {
cid = strValue
}
}

if cid == "" {
return event, nil
}

container := d.watcher.Container(cid)
if container != nil {
meta := common.MapStr{}
metaIface, ok := event["docker"]
if ok {
meta = metaIface.(common.MapStr)
}

if len(container.Labels) > 0 {
labels := common.MapStr{}
for k, v := range container.Labels {
labels.Put(k, v)
}
meta.Put("container.labels", labels)
}

meta.Put("container.id", container.ID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this matches the "fields" with have in metricbeat for the docker containers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, they do

meta.Put("container.image", container.Image)
meta.Put("container.name", container.Name)
event["docker"] = meta
} else {
logp.Debug("docker", "Container not found: %s", cid)
}

return event, nil
}

func (d *addDockerMetadata) String() string {
return "add_docker_metadata=[fields=" + strings.Join(d.fields, ", ") + "]"
}
126 changes: 126 additions & 0 deletions libbeat/processors/add_docker_metadata/add_docker_metadata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package add_docker_metadata

import (
"testing"

"github.com/elastic/beats/libbeat/common"
"github.com/stretchr/testify/assert"
)

func TestInitialization(t *testing.T) {
var testConfig = common.NewConfig()

p, err := buildDockerMetadataProcessor(*testConfig, MockWatcherFactory(nil))
assert.NoError(t, err, "initializing add_docker_metadata processor")

input := common.MapStr{}
result, err := p.Run(input)
assert.NoError(t, err, "processing an event")

assert.Equal(t, common.MapStr{}, result)
}

func TestNoMatch(t *testing.T) {
testConfig, err := common.NewConfigFrom(map[string]interface{}{
"match_fields": []string{"foo"},
})
assert.NoError(t, err)

p, err := buildDockerMetadataProcessor(*testConfig, MockWatcherFactory(nil))
assert.NoError(t, err, "initializing add_docker_metadata processor")

input := common.MapStr{
"field": "value",
}
result, err := p.Run(input)
assert.NoError(t, err, "processing an event")

assert.Equal(t, common.MapStr{"field": "value"}, result)
}

func TestMatchNoContainer(t *testing.T) {
testConfig, err := common.NewConfigFrom(map[string]interface{}{
"match_fields": []string{"foo"},
})
assert.NoError(t, err)

p, err := buildDockerMetadataProcessor(*testConfig, MockWatcherFactory(nil))
assert.NoError(t, err, "initializing add_docker_metadata processor")

input := common.MapStr{
"foo": "garbage",
}
result, err := p.Run(input)
assert.NoError(t, err, "processing an event")

assert.Equal(t, common.MapStr{"foo": "garbage"}, result)
}

func TestMatchContainer(t *testing.T) {
testConfig, err := common.NewConfigFrom(map[string]interface{}{
"match_fields": []string{"foo"},
})
assert.NoError(t, err)

p, err := buildDockerMetadataProcessor(*testConfig, MockWatcherFactory(
map[string]*Container{
"container_id": &Container{
ID: "container_id",
Image: "image",
Name: "name",
Labels: map[string]string{
"a": "1",
"b": "2",
},
},
}))
assert.NoError(t, err, "initializing add_docker_metadata processor")

input := common.MapStr{
"foo": "container_id",
}
result, err := p.Run(input)
assert.NoError(t, err, "processing an event")

assert.EqualValues(t, common.MapStr{
"docker": common.MapStr{
"container": common.MapStr{
"id": "container_id",
"image": "image",
"labels": common.MapStr{
"a": "1",
"b": "2",
},
"name": "name",
},
},
"foo": "container_id",
}, result)
}

// Mock container watcher

func MockWatcherFactory(containers map[string]*Container) WatcherConstructor {
if containers == nil {
containers = make(map[string]*Container)
}
return func(host string, tls *TLSConfig) (Watcher, error) {
return &mockWatcher{containers: containers}, nil
}
}

type mockWatcher struct {
containers map[string]*Container
}

func (m *mockWatcher) Start() error {
return nil
}

func (m *mockWatcher) Container(ID string) *Container {
return m.containers[ID]
}

func (m *mockWatcher) Containers() map[string]*Container {
return m.containers
}
21 changes: 21 additions & 0 deletions libbeat/processors/add_docker_metadata/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package add_docker_metadata

// Config for docker processor
type Config struct {
Host string `config:"host"`
TLS *TLSConfig `config:"ssl"`
Fields []string `config:"match_fields"`
}

// TLSConfig for docker socket connection
type TLSConfig struct {
CA string `config:"certificate_authority"`
Certificate string `config:"certificate"`
Key string `config:"key"`
}

func defaultConfig() Config {
return Config{
Host: "unix:///var/run/docker.sock",
}
}
Loading