Skip to content

Commit

Permalink
Fix docker hanging when container killed (#3612) (#3634)
Browse files Browse the repository at this point in the history
No timeout was passed to the docker client. It seems in case of a killed container it can happen that the connection is hanging. To interrupt this connection, the timeout from the metricset is passed to the client. That means in case info for a container cannot be fetched, it will timeout.

This change requires that the docker module is not run with a timeout of 3s seconds, which indirectly means a period of 3s. The reason is that already the http request waits ~2s for the response. So if 1s is set as timeout, all requests will timeout.

Further changes:

* Containers without names will be ignored, as these are containers for which the data could not be fetched.
* Period was set to 1s by default instead of the period as document. This was changed.
* Add documentation node about minimal period.

Closes #3610

The issue with this PR was introduce in 5.2.1 by fixing the memory leak. Before go routines just piled up, but now they caused filebeat to hang.

This needs also backport to 5.2.2
(cherry picked from commit 99f17d6)
  • Loading branch information
ruflin authored and tsg committed Feb 21, 2017
1 parent 76b78d3 commit 5d94ccd
Show file tree
Hide file tree
Showing 12 changed files with 42 additions and 22 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ https://github.com/elastic/beats/compare/v5.2.0...master[Check the HEAD diff]

*Metricbeat*

- Fix go routine leak in docker module. {pull}3492[3492]
- Fix bug docker module hanging when docker container killed. {issue}3610[3610]
- Set timeout to period instead of 1s by default as documented.

*Packetbeat*

*Winlogbeat*
Expand Down
3 changes: 3 additions & 0 deletions metricbeat/docs/modules/system.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ metricbeat.modules:
cpu_ticks: true
----

It is strongly recommended to not run docker metricsets with a period smaller then 3 seconds. The request to the docker
API already takes up to 2s seconds. Otherwise all the requests would timeout and no data is reported.

[float]
=== Dashboard

Expand Down
5 changes: 5 additions & 0 deletions metricbeat/mb/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ func newBaseModuleFromConfig(rawConfig *common.Config) (BaseModule, error) {
return baseModule, err
}

// If timeout is not set, timeout is set to the same value as period
if baseModule.config.Timeout == 0 {
baseModule.config.Timeout = baseModule.config.Period
}

baseModule.name = strings.ToLower(baseModule.config.Module)

err = mustNotContainDuplicates(baseModule.config.Hosts)
Expand Down
1 change: 0 additions & 1 deletion metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ func (c ModuleConfig) GoString() string { return c.String() }
var defaultModuleConfig = ModuleConfig{
Enabled: true,
Period: time.Second * 10,
Timeout: time.Second,
}

// DefaultModuleConfig returns a ModuleConfig with the default values populated.
Expand Down
6 changes: 3 additions & 3 deletions metricbeat/mb/mb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestModuleConfig(t *testing.T) {
MetricSets: []string{"test"},
Enabled: true,
Period: time.Second * 10,
Timeout: time.Second,
Timeout: 0,
},
},
{
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestModuleConfigDefaults(t *testing.T) {

assert.Equal(t, true, mc.Enabled)
assert.Equal(t, time.Second*10, mc.Period)
assert.Equal(t, time.Second, mc.Timeout)
assert.Equal(t, time.Second*0, mc.Timeout)
assert.Empty(t, mc.Hosts)
}

Expand Down Expand Up @@ -249,7 +249,7 @@ func TestNewBaseModuleFromModuleConfigStruct(t *testing.T) {
assert.Equal(t, moduleName, baseModule.Config().Module)
assert.Equal(t, true, baseModule.Config().Enabled)
assert.Equal(t, time.Second*10, baseModule.Config().Period)
assert.Equal(t, time.Second, baseModule.Config().Timeout)
assert.Equal(t, time.Second*10, baseModule.Config().Timeout)
assert.Empty(t, baseModule.Config().Hosts)
}

Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/docker/cpu/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Fetch returns a list of docker CPU stats.
func (m *MetricSet) Fetch() ([]common.MapStr, error) {
stats, err := docker.FetchStats(m.dockerClient)
stats, err := docker.FetchStats(m.dockerClient, m.Module().Config().Timeout)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/docker/diskio/diskio.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Fetch creates list of events with diskio stats for all containers.
func (m *MetricSet) Fetch() ([]common.MapStr, error) {
stats, err := docker.FetchStats(m.dockerClient)
stats, err := docker.FetchStats(m.dockerClient, m.Module().Config().Timeout)
if err != nil {
return nil, err
}
Expand Down
21 changes: 13 additions & 8 deletions metricbeat/module/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"

"time"

"github.com/fsouza/go-dockerclient"
)

Expand Down Expand Up @@ -58,7 +60,7 @@ func NewDockerClient(endpoint string, config Config) (*docker.Client, error) {
}

// FetchStats returns a list of running containers with all related stats inside
func FetchStats(client *docker.Client) ([]Stat, error) {
func FetchStats(client *docker.Client, timeout time.Duration) ([]Stat, error) {
containers, err := client.ListContainers(docker.ListContainersOptions{})
if err != nil {
return nil, err
Expand All @@ -67,24 +69,27 @@ func FetchStats(client *docker.Client) ([]Stat, error) {
var wg sync.WaitGroup

containersList := make([]Stat, 0, len(containers))
queue := make(chan Stat, 1)
statsQueue := make(chan Stat, 1)
wg.Add(len(containers))

for _, container := range containers {
go func(container docker.APIContainers) {
defer wg.Done()
queue <- exportContainerStats(client, &container)
statsQueue <- exportContainerStats(client, &container, timeout)
}(container)
}

go func() {
wg.Wait()
close(queue)
close(statsQueue)
}()

// This will break after the queue has been drained and queue is closed.
for container := range queue {
containersList = append(containersList, container)
for stat := range statsQueue {
// If names is empty, there is not data inside
if len(stat.Container.Names) != 0 {
containersList = append(containersList, stat)
}
}

return containersList, err
Expand All @@ -95,7 +100,7 @@ func FetchStats(client *docker.Client) ([]Stat, error) {
// This is currently very inefficient as docker calculates the average for each request,
// means each request will take at least 2s: https://github.com/docker/docker/blob/master/cli/command/container/stats_helpers.go#L148
// Getting all stats at once is implemented here: https://github.com/docker/docker/pull/25361
func exportContainerStats(client *docker.Client, container *docker.APIContainers) Stat {
func exportContainerStats(client *docker.Client, container *docker.APIContainers, timeout time.Duration) Stat {
var wg sync.WaitGroup
var event Stat

Expand All @@ -105,7 +110,7 @@ func exportContainerStats(client *docker.Client, container *docker.APIContainers
ID: container.ID,
Stats: statsC,
Stream: false,
Timeout: -1,
Timeout: timeout,
}

wg.Add(2)
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/docker/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Fetch creates a list of memory events for each container.
func (m *MetricSet) Fetch() ([]common.MapStr, error) {
stats, err := docker.FetchStats(m.dockerClient)
stats, err := docker.FetchStats(m.dockerClient, m.Module().Config().Timeout)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/docker/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Fetch methods creates a list of network events for each container.
func (m *MetricSet) Fetch() ([]common.MapStr, error) {
stats, err := docker.FetchStats(m.dockerClient)
stats, err := docker.FetchStats(m.dockerClient, m.Module().Config().Timeout)
if err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions metricbeat/module/system/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ metricbeat.modules:
cpu_ticks: true
----

It is strongly recommended to not run docker metricsets with a period smaller then 3 seconds. The request to the docker
API already takes up to 2s seconds. Otherwise all the requests would timeout and no data is reported.

[float]
=== Dashboard

Expand Down
13 changes: 7 additions & 6 deletions metricbeat/tests/system/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def test_container_fields(self):
"name": "docker",
"metricsets": ["container"],
"hosts": ["unix:///var/run/docker.sock"],
"period": "1s",
"period": "10s",
}])

proc = self.start_beat()
Expand All @@ -40,7 +40,7 @@ def test_cpu_fields(self):
"name": "docker",
"metricsets": ["cpu"],
"hosts": ["unix:///var/run/docker.sock"],
"period": "1s"
"period": "10s"
}])

proc = self.start_beat()
Expand Down Expand Up @@ -70,7 +70,7 @@ def test_diskio_fields(self):
"name": "docker",
"metricsets": ["diskio"],
"hosts": ["unix:///var/run/docker.sock"],
"period": "1s"
"period": "10s"
}])

proc = self.start_beat()
Expand All @@ -97,7 +97,7 @@ def test_info_fields(self):
"name": "docker",
"metricsets": ["info"],
"hosts": ["unix:///var/run/docker.sock"],
"period": "1s"
"period": "10s"
}])

proc = self.start_beat()
Expand All @@ -122,7 +122,7 @@ def test_memory_fields(self):
"name": "docker",
"metricsets": ["memory"],
"hosts": ["unix:///var/run/docker.sock"],
"period": "1s"
"period": "10s"
}])

proc = self.start_beat()
Expand All @@ -148,7 +148,7 @@ def test_network_fields(self):
"name": "docker",
"metricsets": ["network"],
"hosts": ["unix:///var/run/docker.sock"],
"period": "1s"
"period": "10s"
}])

proc = self.start_beat()
Expand All @@ -157,6 +157,7 @@ def test_network_fields(self):

# Ensure no errors or warnings exist in the log.
log = self.get_log()

self.assertNotRegexpMatches(log.replace("WARN EXPERIMENTAL", ""), "ERR|WARN")

output = self.read_output_json()
Expand Down

0 comments on commit 5d94ccd

Please sign in to comment.