From e39028c4de0f8281fc6d30bcb564ded12290096f Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Thu, 5 Aug 2021 04:06:09 +0200 Subject: [PATCH] Conditionally restart apm-server (#5892) --- beater/beater.go | 98 +++++++++++++++++++++++++++++++++++-------- beater/beater_test.go | 38 +++++++++++++++++ 2 files changed, 119 insertions(+), 17 deletions(-) diff --git a/beater/beater.go b/beater/beater.go index b0daa61ca95..25daf2ed4d3 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -18,7 +18,9 @@ package beater import ( + "bytes" "context" + "encoding/json" "fmt" "net" "net/http" @@ -219,6 +221,9 @@ func (bt *beater) start(ctx context.Context, cancelContext context.CancelFunc, b type reloader struct { runServerContext context.Context args sharedServerRunnerParams + // The json marshaled bytes of config.Config, with all the dynamic + // options zeroed out. + staticConfig []byte mu sync.Mutex runner *serverRunner @@ -256,26 +261,81 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error { func (r *reloader) reload(rawConfig *common.Config, namespace string, fleetConfig *config.Fleet) error { r.mu.Lock() defer r.mu.Unlock() - if r.runner != nil { - // TODO: reload should only stop the existing runner if the - // changed config requires a restart. Otherwise, it should - // update the serverRunner's config dynamically. - r.runner.cancelRunServerContext() - <-r.runner.done - r.runner = nil - } - runner, err := newServerRunner(r.runServerContext, serverRunnerParams{ - sharedServerRunnerParams: r.args, - Namespace: namespace, - RawConfig: rawConfig, - FleetConfig: fleetConfig, - }) + shouldRestart, err := r.shouldRestart(rawConfig) if err != nil { return err } - r.runner = runner - go r.runner.run() - return nil + + if shouldRestart { + if r.runner != nil { + r.runner.cancelRunServerContext() + <-r.runner.done + r.runner = nil + } + runner, err := newServerRunner(r.runServerContext, serverRunnerParams{ + sharedServerRunnerParams: r.args, + Namespace: namespace, + RawConfig: rawConfig, + FleetConfig: fleetConfig, + }) + if err != nil { + return err + } + r.runner = runner + go r.runner.run() + return nil + } + // Update current runner. + // TODO: This should actually update the runner. + return r.runner.updateDynamicConfig(rawConfig) +} + +// Compare the new config with the old config to see if the server needs to be +// restarted. +func (r *reloader) shouldRestart(cfg *common.Config) (bool, error) { + // Make a copy of cfg so we don't mutate it + cfg, err := common.MergeConfigs(cfg) + if err != nil { + return false, err + } + + // Remove dynamic values in the config + for _, key := range []string{ + "max_header_size", + "idle_timeout", + "read_timeout", + "write_timeout", + "max_event_size", + "shutdown_timeout", + "response_headers", + "capture_personal_data", + "auth.anonymous.rate_limit", + "expvar", + "rum", + "auth.api_key.limit", + "api_key.limit", // old name for auth.api_key.limit + "default_service_environment", + "agent_config", + } { + cfg.Remove(key, -1) + } + + // Does our static config match the previous static config? If not, + // then we should restart. + m := make(map[string]interface{}) + if err := cfg.Unpack(&m); err != nil { + return false, err + } + key, err := json.Marshal(m) + if err != nil { + return false, err + } + + shouldRestart := !bytes.Equal(key, r.staticConfig) + // Set the static config on reloader for the next comparison + r.staticConfig = key + + return shouldRestart, nil } type serverRunner struct { @@ -431,6 +491,10 @@ func (s *serverRunner) run() error { return publisher.Stop(s.backgroundContext) } +func (s *serverRunner) updateDynamicConfig(rawConfig *common.Config) error { + return nil +} + func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) RunServerFunc { processors := []model.BatchProcessor{ modelprocessor.SetHostHostname{}, diff --git a/beater/beater_test.go b/beater/beater_test.go index 4c099226b36..f288a07f0af 100644 --- a/beater/beater_test.go +++ b/beater/beater_test.go @@ -328,3 +328,41 @@ func TestFleetStoreUsed(t *testing.T) { assert.True(t, called) } + +func TestShouldRestart(t *testing.T) { + c := common.NewConfig() + r := &reloader{} + + shouldRestart, err := r.shouldRestart(c) + require.NoError(t, err) + + assert.True(t, shouldRestart) + + shouldRestart, err = r.shouldRestart(c) + require.NoError(t, err) + assert.False(t, shouldRestart) + + // Change some dynamic options and verify we do not want to restart. + c.SetInt("max_header_size", -1, 12345) + c.SetString("idle_timeout", -1, "1s") + c.SetChild("agent_config", -1, common.MustNewConfigFrom([]map[string]interface{}{{ + "service": map[string]interface{}{ + "name": "service", + }, + "config": map[string]string{"log_level": "debug"}, + }})) + + shouldRestart, err = r.shouldRestart(c) + require.NoError(t, err) + assert.False(t, shouldRestart) + + // We use the original cfg elsewhere, so make sure we don't mutate it. + maxHeaderSize, _ := c.Int("max_header_size", -1) + assert.Equal(t, int64(12345), maxHeaderSize) + + // Change some static options and verify we do want to restart. + c.SetInt("max_connections", -1, 10) + shouldRestart, err = r.shouldRestart(c) + require.NoError(t, err) + assert.True(t, shouldRestart) +}