Skip to content

Commit

Permalink
[libbeat] Fix / clarify the module reload logic (elastic#16440) (elas…
Browse files Browse the repository at this point in the history
…tic#16686)

(cherry picked from commit 2743d51)
  • Loading branch information
faec committed Mar 2, 2020
1 parent f258a05 commit 2ca17ff
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 18 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix index names for indexing not always guaranteed to be lower case. {pull}16081[16081]
- Upgrade go-ucfg to latest v0.8.1. {pull}15937{15937}
- Fix loading processors from annotation hints. {pull}16348[16348]
- Fix an issue that could cause redundant configuration reloads. {pull}16440[16440]
- Fix k8s pods labels broken schema. {pull}16480[16480]
- Fix k8s pods annotations broken schema. {pull}16554[16554]
- Upgrade go-ucfg to latest v0.8.3. {pull}16450{16450}
Expand Down Expand Up @@ -183,6 +184,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add document_id setting to decode_json_fields processor. {pull}15859[15859]
- Include network information by default on add_host_metadata and add_observer_metadata. {issue}15347[15347] {pull}16077[16077]
- Add `aws_ec2` provider for autodiscover. {issue}12518[12518] {pull}14823[14823]
- Add monitoring variable `libbeat.config.scans` to distinguish scans of the configuration directory from actual reloads of its contents. {pull}16440[16440]
- Add support for multiple password in redis output. {issue}16058[16058] {pull}16206[16206]
- Remove experimental flag from `setup.template.append_fields` {pull}16576[16576]
- Add `add_cloudfoundry_metadata` processor to annotate events with Cloud Foundry application data. {pull}16621[16621]
Expand Down
6 changes: 5 additions & 1 deletion heartbeat/tests/system/test_reload.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ def test_config_add(self):
"""
self.setup_dynamic()

# Wait until the beat is running and has performed its first load of
# the config directory.
self.wait_until(lambda: self.log_contains(
"Starting reload procedure, current runners: 0"))

Expand All @@ -77,8 +79,10 @@ def test_config_add(self):
self.write_dyn_config(
"test.yml", self.http_cfg("myid", "http://localhost:{}".format(server.server_port)))

# The beat should recognize there is a new runner to start.
self.wait_until(lambda: self.log_contains(
"Starting reload procedure, current runners: 1"))
"Start list: 1, Stop list: 0"),
max_timeout=10)

self.wait_until(lambda: self.output_lines() > 0)

Expand Down
27 changes: 18 additions & 9 deletions libbeat/cfgfile/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ var (

debugf = logp.MakeDebug("cfgfile")

// configScans measures how many times the config dir was scanned for
// changes, configReloads measures how many times there were changes that
// triggered an actual reload.
configScans = monitoring.NewInt(nil, "libbeat.config.scans")
configReloads = monitoring.NewInt(nil, "libbeat.config.reloads")
moduleStarts = monitoring.NewInt(nil, "libbeat.config.module.starts")
moduleStops = monitoring.NewInt(nil, "libbeat.config.module.stops")
Expand Down Expand Up @@ -185,7 +189,11 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {
rl.config.Reload.Period = 0
}

overwriteUpdate := true
// If forceReload is set, the configuration should be reloaded
// even if there are no changes. It is set on the first iteration,
// and whenever an attempted reload fails. It is unset whenever
// a reload succeeds.
forceReload := true

for {
select {
Expand All @@ -195,7 +203,7 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {

case <-time.After(rl.config.Reload.Period):
debugf("Scan for new config files")
configReloads.Add(1)
configScans.Add(1)

files, updated, err := gw.Scan()
if err != nil {
Expand All @@ -204,21 +212,22 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {
logp.Err("Error fetching new config files: %v", err)
}

// no file changes
if !updated && !overwriteUpdate {
overwriteUpdate = false
// if there are no changes, skip this reload unless forceReload is set.
if !updated && !forceReload {
continue
}
configReloads.Add(1)

// Load all config objects
configs, _ := rl.loadConfigs(files)

debugf("Number of module configs found: %v", len(configs))

if err := list.Reload(configs); err != nil {
// Make sure the next run also updates because some runners were not properly loaded
overwriteUpdate = true
}
err = list.Reload(configs)
// Force reload on the next iteration if and only if this one failed.
// (Any errors are already logged by list.Reload, so we don't need to
// propagate the details further.)
forceReload = err != nil
}

// Path loading is enabled but not reloading. Loads files only once and then stops.
Expand Down
107 changes: 107 additions & 0 deletions libbeat/cfgfile/reload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build integration

package cfgfile

import (
"fmt"
"io/ioutil"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/common"
)

func TestReloader(t *testing.T) {
// Create random temp directory
dir, err := ioutil.TempDir("", "libbeat-reloader")
defer os.RemoveAll(dir)
if err != nil {
t.Fatal(err)
}
glob := dir + "/*.yml"

config := common.MustNewConfigFrom(common.MapStr{
"path": glob,
"reload": common.MapStr{
"period": "1s",
"enabled": true,
},
})
// common.Config{}
reloader := NewReloader(nil, config)
retryCount := 10

go reloader.Run(nil)
defer reloader.Stop()

// wait until configScans >= 2 (which should happen after ~1 second)
for i := 0; i < retryCount; i++ {
if configScans.Get() >= 2 {
break
}
// time interval is slightly more than a second so we don't slightly
// undershoot the first iteration and wait a whole extra second.
time.Sleep(1100 * time.Millisecond)
}
if configScans.Get() < 2 {
assert.Fail(t, "Timed out waiting for configScans >= 2")
}

// The first scan should cause a reload, but additional ones should not,
// so configReloads should still be 1.
assert.Equal(t, int64(1), configReloads.Get())

// Write a file to the reloader path to trigger a real reload
content := []byte("test\n")
err = ioutil.WriteFile(dir+"/config1.yml", content, 0644)
assert.NoError(t, err)

// Wait for the number of scans to increase at least twice. This is somewhat
// pedantic, but if we just wait for the next scan, it's possible to wake up
// during the brief interval after configScans is updated but before
// configReloads is, giving a false negative. Waiting two iterations
// guarantees that the change from the first one has taken effect.
targetScans := configScans.Get() + 2
for i := 0; i < retryCount; i++ {
time.Sleep(time.Second)
if configScans.Get() >= targetScans {
break
}
}
if configScans.Get() < targetScans {
assert.Fail(t,
fmt.Sprintf("Timed out waiting for configScans >= %d", targetScans))
}

// The number of reloads should now have increased. It would be nicer to
// check if the value is exactly 2, but we can't guarantee this: the glob
// watcher includes an extra 1-second margin around the real modification
// time, so changes that fall too close to a scan interval can be detected
// twice.
if configReloads.Get() < 2 {
assert.Fail(t,
fmt.Sprintf(
"Reloader performed %d scans but only reloaded once",
configScans.Get()))
}
}
1 change: 1 addition & 0 deletions libbeat/docs/http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ curl -XGET 'localhost:5066/stats?pretty'
"starts": 0,
"stops": 0
},
"scans": 1,
"reloads": 1
},
"output": {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/tests/system/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_stats(self):
data = json.loads(r.content)

# Test one data point
assert data["libbeat"]["config"]["reloads"] == 0
assert data["libbeat"]["config"]["scans"] == 0

proc.check_kill_and_wait()

Expand Down
1 change: 1 addition & 0 deletions metricbeat/module/beat/stats/_meta/test/stats.800.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"starts": 0,
"stops": 0
},
"scans": 1,
"reloads": 1
},
"output": {
Expand Down
12 changes: 5 additions & 7 deletions metricbeat/tests/system/test_reload.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ def test_reload(self):

@unittest.skipUnless(re.match("(?i)win|linux|darwin|freebsd|openbsd", sys.platform), "os")
def test_start_stop(self):
def reload_line(
num_runners): return "Starting reload procedure, current runners: %d" % num_runners
"""
Test if module is properly started and stopped
"""
Expand All @@ -61,7 +59,7 @@ def reload_line(

# Ensure no modules are loaded
self.wait_until(
lambda: self.log_contains(reload_line(0)),
lambda: self.log_contains("Start list: 0, Stop list: 0"),
max_timeout=10)

systemConfig = """
Expand All @@ -73,17 +71,17 @@ def reload_line(
with open(config_path, 'w') as f:
f.write(systemConfig)

# Ensure the module was successfully loaded
# Ensure the module is started
self.wait_until(
lambda: self.log_contains(reload_line(1)),
lambda: self.log_contains("Start list: 1, Stop list: 0"),
max_timeout=10)

# Remove config again
os.remove(config_path)

# Ensure the module was successfully unloaded
# Ensure the module is stopped
self.wait_until(
lambda: self.log_contains(reload_line(0)),
lambda: self.log_contains("Start list: 0, Stop list: 1"),
max_timeout=10)

time.sleep(1)
Expand Down

0 comments on commit 2ca17ff

Please sign in to comment.