Skip to content

Commit

Permalink
Conditionally restart apm-server (#5892)
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartnelson3 authored Aug 5, 2021
1 parent 792c7ec commit e39028c
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 17 deletions.
98 changes: 81 additions & 17 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package beater

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -219,6 +221,9 @@ func (bt *beater) start(ctx context.Context, cancelContext context.CancelFunc, b
type reloader struct {
runServerContext context.Context
args sharedServerRunnerParams
// The json marshaled bytes of config.Config, with all the dynamic
// options zeroed out.
staticConfig []byte

mu sync.Mutex
runner *serverRunner
Expand Down Expand Up @@ -256,26 +261,81 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error {
func (r *reloader) reload(rawConfig *common.Config, namespace string, fleetConfig *config.Fleet) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.runner != nil {
// TODO: reload should only stop the existing runner if the
// changed config requires a restart. Otherwise, it should
// update the serverRunner's config dynamically.
r.runner.cancelRunServerContext()
<-r.runner.done
r.runner = nil
}
runner, err := newServerRunner(r.runServerContext, serverRunnerParams{
sharedServerRunnerParams: r.args,
Namespace: namespace,
RawConfig: rawConfig,
FleetConfig: fleetConfig,
})
shouldRestart, err := r.shouldRestart(rawConfig)
if err != nil {
return err
}
r.runner = runner
go r.runner.run()
return nil

if shouldRestart {
if r.runner != nil {
r.runner.cancelRunServerContext()
<-r.runner.done
r.runner = nil
}
runner, err := newServerRunner(r.runServerContext, serverRunnerParams{
sharedServerRunnerParams: r.args,
Namespace: namespace,
RawConfig: rawConfig,
FleetConfig: fleetConfig,
})
if err != nil {
return err
}
r.runner = runner
go r.runner.run()
return nil
}
// Update current runner.
// TODO: This should actually update the runner.
return r.runner.updateDynamicConfig(rawConfig)
}

// Compare the new config with the old config to see if the server needs to be
// restarted.
func (r *reloader) shouldRestart(cfg *common.Config) (bool, error) {
// Make a copy of cfg so we don't mutate it
cfg, err := common.MergeConfigs(cfg)
if err != nil {
return false, err
}

// Remove dynamic values in the config
for _, key := range []string{
"max_header_size",
"idle_timeout",
"read_timeout",
"write_timeout",
"max_event_size",
"shutdown_timeout",
"response_headers",
"capture_personal_data",
"auth.anonymous.rate_limit",
"expvar",
"rum",
"auth.api_key.limit",
"api_key.limit", // old name for auth.api_key.limit
"default_service_environment",
"agent_config",
} {
cfg.Remove(key, -1)
}

// Does our static config match the previous static config? If not,
// then we should restart.
m := make(map[string]interface{})
if err := cfg.Unpack(&m); err != nil {
return false, err
}
key, err := json.Marshal(m)
if err != nil {
return false, err
}

shouldRestart := !bytes.Equal(key, r.staticConfig)
// Set the static config on reloader for the next comparison
r.staticConfig = key

return shouldRestart, nil
}

type serverRunner struct {
Expand Down Expand Up @@ -431,6 +491,10 @@ func (s *serverRunner) run() error {
return publisher.Stop(s.backgroundContext)
}

func (s *serverRunner) updateDynamicConfig(rawConfig *common.Config) error {
return nil
}

func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) RunServerFunc {
processors := []model.BatchProcessor{
modelprocessor.SetHostHostname{},
Expand Down
38 changes: 38 additions & 0 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,41 @@ func TestFleetStoreUsed(t *testing.T) {

assert.True(t, called)
}

func TestShouldRestart(t *testing.T) {
c := common.NewConfig()
r := &reloader{}

shouldRestart, err := r.shouldRestart(c)
require.NoError(t, err)

assert.True(t, shouldRestart)

shouldRestart, err = r.shouldRestart(c)
require.NoError(t, err)
assert.False(t, shouldRestart)

// Change some dynamic options and verify we do not want to restart.
c.SetInt("max_header_size", -1, 12345)
c.SetString("idle_timeout", -1, "1s")
c.SetChild("agent_config", -1, common.MustNewConfigFrom([]map[string]interface{}{{
"service": map[string]interface{}{
"name": "service",
},
"config": map[string]string{"log_level": "debug"},
}}))

shouldRestart, err = r.shouldRestart(c)
require.NoError(t, err)
assert.False(t, shouldRestart)

// We use the original cfg elsewhere, so make sure we don't mutate it.
maxHeaderSize, _ := c.Int("max_header_size", -1)
assert.Equal(t, int64(12345), maxHeaderSize)

// Change some static options and verify we do want to restart.
c.SetInt("max_connections", -1, 10)
shouldRestart, err = r.shouldRestart(c)
require.NoError(t, err)
assert.True(t, shouldRestart)
}

0 comments on commit e39028c

Please sign in to comment.