Skip to content

Commit

Permalink
Fix issue where updating a single unit results in other units being t…
Browse files Browse the repository at this point in the history
…urned off (#34504)

* Reload all inputs when only one changes.

* Update changelog.

* Fix stopping case.

* Update CHANGELOG.next.asciidoc

Co-authored-by: Craig MacKenzie <craig.mackenzie@elastic.co>

* Adjust test to catch bad case.

---------

Co-authored-by: Craig MacKenzie <craig.mackenzie@elastic.co>
(cherry picked from commit 9f15870)
  • Loading branch information
blakerouse authored and mergify[bot] committed Feb 9, 2023
1 parent 7695dac commit 7c4b02e
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491]
- Log errors from the Elastic Agent V2 client errors channel. Avoids blocking when error occurs communicating with the Elastic Agent. {pull}34392[34392]
- Only log publish event messages in trace log level under elastic-agent. {pull}34391[34391]
- Fix issue where updating a single Elastic Agent configuration unit results in other units being turned off. {pull}34504[34504]

*Auditbeat*

Expand Down
34 changes: 24 additions & 10 deletions x-pack/libbeat/management/managerV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,16 +590,6 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error {
return fmt.Errorf("input unit %s has no config", unit.ID())
}

var prevCfg *proto.UnitExpectedConfig
if cm.lastInputCfgs != nil {
prevCfg, _ = cm.lastInputCfgs[unit.ID()]
}
if prevCfg != nil && gproto.Equal(prevCfg, rawConfig) {
// configuration for the input did not change; do nothing
cm.logger.Debugf("Skipped reloading input unit %s; configuration didn't change", unit.ID())
continue
}

inputCfg, err := generateBeatConfig(rawConfig, agentInfo)
if err != nil {
return fmt.Errorf("failed to generate configuration for unit %s: %w", unit.ID(), err)
Expand All @@ -608,6 +598,11 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error {
inputBeatCfgs = append(inputBeatCfgs, inputCfg...)
}

if !didChange(cm.lastInputCfgs, inputCfgs) {
cm.logger.Debug("Skipped reloading input units; configuration didn't change")
return nil
}

err := obj.Reload(inputBeatCfgs)
if err != nil {
return fmt.Errorf("failed to reloading inputs: %w", err)
Expand Down Expand Up @@ -703,3 +698,22 @@ func getZapcoreLevel(ll client.UnitLogLevel) (zapcore.Level, bool) {
// info level for fallback
return zapcore.InfoLevel, false
}

func didChange(previous map[string]*proto.UnitExpectedConfig, latest map[string]*proto.UnitExpectedConfig) bool {
if (previous == nil && latest != nil) || (previous != nil && latest == nil) {
return true
}
if len(previous) != len(latest) {
return true
}
for k, v := range latest {
p, ok := previous[k]
if !ok {
return true
}
if !gproto.Equal(p, v) {
return true
}
}
return false
}
23 changes: 19 additions & 4 deletions x-pack/libbeat/management/managerV2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,20 @@ func TestManagerV2(t *testing.T) {
logLevelSet := false
allStopped := false
onObserved := func(observed *proto.CheckinObserved, currentIdx int) {
if currentIdx < 2 {
if currentIdx == 1 {
oCfg := output.Config()
iCfgs := inputs.Configs()
if oCfg != nil && len(iCfgs) == 3 {
configsSet = true
t.Logf("output and inputs configuration set")
}
} else if currentIdx == 2 {
oCfg := output.Config()
iCfgs := inputs.Configs()
if oCfg == nil || len(iCfgs) != 3 {
// should not happen (config no longer set)
configsSet = false
t.Logf("output and inputs configuration cleared (should not happen)")
}
} else {
oCfg := output.Config()
Expand All @@ -51,10 +60,12 @@ func TestManagerV2(t *testing.T) {
}
if len(observed.Units) == 0 {
allStopped = true
t.Logf("output and inputs configuration cleared (stopping)")
}
}
if logp.GetLevel() == zapcore.DebugLevel {
logLevelSet = true
t.Logf("debug log level set")
}
}

Expand Down Expand Up @@ -169,7 +180,7 @@ func TestManagerV2(t *testing.T) {
},
},
{},
}, onObserved)
}, onObserved, 500*time.Millisecond)
require.NoError(t, srv.Start())
defer srv.Stop()

Expand All @@ -192,10 +203,10 @@ func TestManagerV2(t *testing.T) {

require.Eventually(t, func() bool {
return configsSet && configsCleared && logLevelSet && allStopped
}, 15*time.Second, 100*time.Millisecond)
}, 15*time.Second, 300*time.Millisecond)
}

func mockSrvWithUnits(units [][]*proto.UnitExpected, observedCallback func(*proto.CheckinObserved, int)) *mock.StubServerV2 {
func mockSrvWithUnits(units [][]*proto.UnitExpected, observedCallback func(*proto.CheckinObserved, int), delay time.Duration) *mock.StubServerV2 {
i := 0
agentInfo := &proto.CheckinAgentInfo{
Id: "elastic-agent-id",
Expand All @@ -215,6 +226,10 @@ func mockSrvWithUnits(units [][]*proto.UnitExpected, observedCallback func(*prot
Units: units[i],
}
}
// delay sending next expected based on delay
if delay > 0 {
<-time.After(delay)
}
// send next set of units
i += 1
if i >= len(units) {
Expand Down

0 comments on commit 7c4b02e

Please sign in to comment.