-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[new feature] promtail: Add config reload endoint / signal to promtail #7247
Changes from 8 commits
0c7e77c
f7d7244
b9c89d5
0b7e536
07a00bd
9e82c9f
4bdf6cd
ce7ddb0
e01b8bd
4d6ee77
69489b9
646b120
ebc0e35
df6f15d
f838787
3074a1e
9bd469f
ab2dd37
6bc47e3
6651e62
2817f60
a2809cd
b980753
41b584f
5e2dd98
489f9cf
5582e95
dba1290
3928fff
3396a6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ import ( | |
"fmt" | ||
"os" | ||
"reflect" | ||
"sync" | ||
|
||
// embed time zone data | ||
_ "time/tzdata" | ||
|
@@ -20,8 +21,7 @@ import ( | |
"github.com/grafana/loki/clients/pkg/logentry/stages" | ||
"github.com/grafana/loki/clients/pkg/promtail" | ||
"github.com/grafana/loki/clients/pkg/promtail/client" | ||
"github.com/grafana/loki/clients/pkg/promtail/config" | ||
|
||
promtail_config "github.com/grafana/loki/clients/pkg/promtail/config" | ||
"github.com/grafana/loki/pkg/util" | ||
_ "github.com/grafana/loki/pkg/util/build" | ||
"github.com/grafana/loki/pkg/util/cfg" | ||
|
@@ -32,15 +32,19 @@ func init() { | |
prometheus.MustRegister(version.NewCollector("promtail")) | ||
} | ||
|
||
var mtx sync.Mutex | ||
|
||
type Config struct { | ||
config.Config `yaml:",inline"` | ||
printVersion bool | ||
printConfig bool | ||
logConfig bool | ||
dryRun bool | ||
configFile string | ||
configExpandEnv bool | ||
inspect bool | ||
promtail_config.Config `yaml:",inline"` | ||
printVersion bool | ||
printConfig bool | ||
logConfig bool | ||
dryRun bool | ||
configFile string | ||
configExpandEnv bool | ||
inspect bool | ||
|
||
cnt int | ||
} | ||
|
||
func (c *Config) RegisterFlags(f *flag.FlagSet) { | ||
|
@@ -66,11 +70,11 @@ func (c *Config) Clone() flagext.Registerer { | |
func main() { | ||
// Load config, merging config file and CLI flags | ||
var config Config | ||
if err := cfg.DefaultUnmarshal(&config, os.Args[1:], flag.CommandLine); err != nil { | ||
args := os.Args[1:] | ||
if err := cfg.DefaultUnmarshal(&config, args, flag.CommandLine); err != nil { | ||
fmt.Println("Unable to parse config:", err) | ||
os.Exit(1) | ||
} | ||
|
||
// Handle -version CLI flag | ||
if config.printVersion { | ||
fmt.Println(version.Print("promtail")) | ||
|
@@ -112,7 +116,17 @@ func main() { | |
} | ||
|
||
clientMetrics := client.NewMetrics(prometheus.DefaultRegisterer, config.Config.Options.StreamLagLabels) | ||
p, err := promtail.New(config.Config, clientMetrics, config.dryRun) | ||
newConfigFunc := func() promtail_config.Config { | ||
mtx.Lock() | ||
defer mtx.Unlock() | ||
var config Config | ||
if err := cfg.DefaultUnmarshal(&config, args, flag.NewFlagSet(os.Args[0], flag.ExitOnError)); err != nil { | ||
fmt.Println("Unable to parse config:", err) | ||
os.Exit(1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I certainly don't think this behaviour is desired; if a runtime config cannot be reloaded, it should not kill the process. This will also better match how Loki works with its overrides. |
||
} | ||
return config.Config | ||
} | ||
p, err := promtail.New(config.Config, newConfigFunc, clientMetrics, config.dryRun) | ||
if err != nil { | ||
level.Error(util_log.Logger).Log("msg", "error creating promtail", "error", err) | ||
os.Exit(1) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,13 @@ | ||
package promtail | ||
|
||
import ( | ||
"os" | ||
"os/signal" | ||
"sync" | ||
"syscall" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/go-kit/log/level" | ||
"github.com/prometheus/client_golang/prometheus" | ||
|
||
"github.com/grafana/loki/clients/pkg/logentry/stages" | ||
|
@@ -42,17 +46,24 @@ type Promtail struct { | |
logger log.Logger | ||
reg prometheus.Registerer | ||
|
||
stopped bool | ||
mtx sync.Mutex | ||
stopped bool | ||
mtx sync.Mutex | ||
configFile string | ||
newConfig func() config.Config | ||
metrics *client.Metrics | ||
dryRun bool | ||
} | ||
|
||
// New makes a new Promtail. | ||
func New(cfg config.Config, metrics *client.Metrics, dryRun bool, opts ...Option) (*Promtail, error) { | ||
func New(cfg config.Config, newConfig func() config.Config, metrics *client.Metrics, dryRun bool, opts ...Option) (*Promtail, error) { | ||
// Initialize promtail with some defaults and allow the options to override | ||
// them. | ||
|
||
promtail := &Promtail{ | ||
logger: util_log.Logger, | ||
reg: prometheus.DefaultRegisterer, | ||
logger: util_log.Logger, | ||
reg: prometheus.DefaultRegisterer, | ||
metrics: metrics, | ||
dryRun: dryRun, | ||
} | ||
for _, o := range opts { | ||
// todo (callum) I don't understand why I needed to add this check | ||
|
@@ -61,37 +72,62 @@ func New(cfg config.Config, metrics *client.Metrics, dryRun bool, opts ...Option | |
} | ||
o(promtail) | ||
} | ||
err := promtail.reloadConfig(cfg) | ||
if err != nil { | ||
return nil, err | ||
} | ||
server, err := server.New(cfg.ServerConfig, promtail.logger, promtail.targetManagers, cfg.String()) | ||
if err != nil { | ||
return nil, err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please wrap these errors There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
} | ||
promtail.server = server | ||
promtail.newConfig = newConfig | ||
|
||
cfg.Setup(promtail.logger) | ||
return promtail, nil | ||
} | ||
|
||
func (p *Promtail) reloadConfig(cfg config.Config) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this not a pointer to config.Config? It seems like we're mutating it within this function (ie. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for such fast feedback,done. |
||
level.Info(p.logger).Log("msg", "Loading configuration file") | ||
liguozhong marked this conversation as resolved.
Show resolved
Hide resolved
|
||
p.mtx.Lock() | ||
defer p.mtx.Unlock() | ||
if p.targetManagers != nil { | ||
p.targetManagers.Stop() | ||
} | ||
if p.client != nil { | ||
p.client.Stop() | ||
} | ||
|
||
cfg.Setup(p.logger) | ||
if cfg.LimitsConfig.ReadlineRateEnabled { | ||
stages.SetReadLineRateLimiter(cfg.LimitsConfig.ReadlineRate, cfg.LimitsConfig.ReadlineBurst, cfg.LimitsConfig.ReadlineRateDrop) | ||
} | ||
var err error | ||
if dryRun { | ||
promtail.client, err = client.NewLogger(metrics, cfg.Options.StreamLagLabels, promtail.logger, cfg.ClientConfigs...) | ||
if p.dryRun { | ||
p.client, err = client.NewLogger(p.metrics, cfg.Options.StreamLagLabels, p.logger, cfg.ClientConfigs...) | ||
if err != nil { | ||
return nil, err | ||
return err | ||
} | ||
cfg.PositionsConfig.ReadOnly = true | ||
} else { | ||
promtail.client, err = client.NewMulti(metrics, cfg.Options.StreamLagLabels, promtail.logger, cfg.LimitsConfig.MaxStreams, cfg.ClientConfigs...) | ||
p.client, err = client.NewMulti(p.metrics, cfg.Options.StreamLagLabels, p.logger, cfg.LimitsConfig.MaxStreams, cfg.ClientConfigs...) | ||
if err != nil { | ||
return nil, err | ||
return err | ||
} | ||
} | ||
|
||
tms, err := targets.NewTargetManagers(promtail, promtail.reg, promtail.logger, cfg.PositionsConfig, promtail.client, cfg.ScrapeConfig, &cfg.TargetConfig) | ||
tms, err := targets.NewTargetManagers(p, p.reg, p.logger, cfg.PositionsConfig, p.client, cfg.ScrapeConfig, &cfg.TargetConfig) | ||
if err != nil { | ||
return nil, err | ||
return err | ||
} | ||
promtail.targetManagers = tms | ||
server, err := server.New(cfg.ServerConfig, promtail.logger, tms, cfg.String()) | ||
if err != nil { | ||
return nil, err | ||
p.targetManagers = tms | ||
|
||
promServer := p.server | ||
if promServer != nil { | ||
promtailServer := promServer.(*server.PromtailServer) | ||
promtailServer.ReloadTms(p.targetManagers) | ||
} | ||
promtail.server = server | ||
return promtail, nil | ||
|
||
return nil | ||
} | ||
|
||
// Run the promtail; will block until a signal is received. | ||
|
@@ -103,6 +139,7 @@ func (p *Promtail) Run() error { | |
return nil | ||
} | ||
p.mtx.Unlock() // unlock before blocking | ||
go p.watchConfig() | ||
return p.server.Run() | ||
} | ||
|
||
|
@@ -133,3 +170,33 @@ func (p *Promtail) Shutdown() { | |
func (p *Promtail) ActiveTargets() map[string][]target.Target { | ||
return p.targetManagers.ActiveTargets() | ||
} | ||
|
||
func (p *Promtail) watchConfig() { | ||
// Reload handler. | ||
// Make sure that sighup handler is registered with a redirect to the channel before the potentially | ||
if p.newConfig == nil { | ||
level.Warn(p.logger).Log("msg", "disable watchConfig") | ||
return | ||
} | ||
level.Warn(p.logger).Log("msg", "enable watchConfig") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These log messages should be more clear, please. |
||
hup := make(chan os.Signal, 1) | ||
signal.Notify(hup, syscall.SIGHUP) | ||
promtailServer := p.server.(*server.PromtailServer) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, please add a check if the type is not expected |
||
for { | ||
select { | ||
case <-hup: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry, not super familiar with these live reload patterns, can you explain the SIGHUP flow? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I refer to the reload documentation of prometheus, we should also provide this reload way. https://prometheus.io/docs/prometheus/latest/configuration/configuration/ |
||
cfg := p.newConfig() | ||
if err := p.reloadConfig(cfg); err != nil { | ||
level.Error(p.logger).Log("msg", "Error reloading config", "err", err) | ||
} | ||
case rc := <-promtailServer.Reload(): | ||
cfg := p.newConfig() | ||
if err := p.reloadConfig(cfg); err != nil { | ||
level.Error(p.logger).Log("msg", "Error reloading config", "err", err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer if we centralised this logic rather than repeating it |
||
rc <- err | ||
} else { | ||
rc <- nil | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -106,7 +106,7 @@ func TestPromtail(t *testing.T) { | |
_ = server.Shutdown(context.Background()) | ||
}() | ||
|
||
p, err := New(buildTestConfig(t, positionsFileName, testDir), clientMetrics, false, nil) | ||
p, err := New(buildTestConfig(t, positionsFileName, testDir), nil, clientMetrics, false, nil) | ||
if err != nil { | ||
t.Error("error creating promtail", err) | ||
return | ||
|
@@ -659,7 +659,7 @@ func Test_DryRun(t *testing.T) { | |
require.NoError(t, err) | ||
defer os.Remove(f.Name()) | ||
|
||
_, err = New(config.Config{}, clientMetrics, true, nil) | ||
_, err = New(config.Config{}, nil, clientMetrics, true, nil) | ||
require.Error(t, err) | ||
|
||
// Set the minimum config needed to start a server. We need to do this since we | ||
|
@@ -681,7 +681,7 @@ func Test_DryRun(t *testing.T) { | |
PositionsFile: f.Name(), | ||
SyncPeriod: time.Second, | ||
}, | ||
}, clientMetrics, true, nil) | ||
}, nil, clientMetrics, true, nil) | ||
require.NoError(t, err) | ||
|
||
prometheus.DefaultRegisterer = prometheus.NewRegistry() | ||
|
@@ -693,7 +693,7 @@ func Test_DryRun(t *testing.T) { | |
PositionsFile: f.Name(), | ||
SyncPeriod: time.Second, | ||
}, | ||
}, clientMetrics, false, nil) | ||
}, nil, clientMetrics, false, nil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want a test that this new configReload function is called? Not sure if we need to go so far as the whole config gets reloaded, but maybe at least the function is called, and test the flow where watch config is disabled when the function is not provided? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||
require.NoError(t, err) | ||
require.IsType(t, &client.MultiClient{}, p.client) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ import ( | |
"path" | ||
"sort" | ||
"strings" | ||
"sync" | ||
"syscall" | ||
"text/template" | ||
|
||
|
@@ -39,8 +40,10 @@ type Server interface { | |
type PromtailServer struct { | ||
*serverww.Server | ||
log log.Logger | ||
mtx sync.Mutex | ||
tms *targets.TargetManagers | ||
externalURL *url.URL | ||
reloadCh chan chan error | ||
healthCheckTarget bool | ||
promtailCfg string | ||
} | ||
|
@@ -51,6 +54,8 @@ type Config struct { | |
ExternalURL string `yaml:"external_url"` | ||
HealthCheckTarget *bool `yaml:"health_check_target"` | ||
Disable bool `yaml:"disable"` | ||
Reload bool `yaml:"reload"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should consider naming this more clearly. I would suggest There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
NewByReload bool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is this for? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is redundant, invalid variables left over from my development process |
||
} | ||
|
||
// RegisterFlags with prefix registers flags where every name is prefixed by | ||
|
@@ -60,6 +65,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { | |
cfg.Config.RegisterFlags(f) | ||
|
||
f.BoolVar(&cfg.Disable, prefix+"server.disable", false, "Disable the http and grpc server.") | ||
f.BoolVar(&cfg.Reload, prefix+"server.reload", false, "Enable reload via HTTP request.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It can also be reloaded via There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I know, the SIGHUP way in the reload of prometheus cannot be disable. |
||
} | ||
|
||
// RegisterFlags adds the flags required to config this to the given FlagSet | ||
|
@@ -91,6 +97,7 @@ func New(cfg Config, log log.Logger, tms *targets.TargetManagers, promtailCfg st | |
serv := &PromtailServer{ | ||
Server: wws, | ||
log: log, | ||
reloadCh: make(chan chan error), | ||
tms: tms, | ||
externalURL: externalURL, | ||
healthCheckTarget: healthCheckTargetFlag, | ||
|
@@ -103,12 +110,17 @@ func New(cfg Config, log log.Logger, tms *targets.TargetManagers, promtailCfg st | |
serv.HTTP.Path("/service-discovery").Handler(http.HandlerFunc(serv.serviceDiscovery)) | ||
serv.HTTP.Path("/targets").Handler(http.HandlerFunc(serv.targets)) | ||
serv.HTTP.Path("/config").Handler(http.HandlerFunc(serv.config)) | ||
if cfg.Reload { | ||
serv.HTTP.Path("/reload").Handler(http.HandlerFunc(serv.reload)) | ||
} | ||
serv.HTTP.Path("/debug/fgprof").Handler(fgprof.Handler()) | ||
return serv, nil | ||
} | ||
|
||
// serviceDiscovery serves the service discovery page. | ||
func (s *PromtailServer) serviceDiscovery(rw http.ResponseWriter, req *http.Request) { | ||
s.mtx.Lock() | ||
defer s.mtx.Unlock() | ||
var index []string | ||
allTarget := s.tms.AllTargets() | ||
for job := range allTarget { | ||
|
@@ -187,6 +199,8 @@ func (s *PromtailServer) config(rw http.ResponseWriter, req *http.Request) { | |
|
||
// targets serves the targets page. | ||
func (s *PromtailServer) targets(rw http.ResponseWriter, req *http.Request) { | ||
s.mtx.Lock() | ||
defer s.mtx.Unlock() | ||
executeTemplate(req.Context(), rw, templateOptions{ | ||
Data: struct { | ||
TargetPools map[string][]target.Target | ||
|
@@ -218,8 +232,29 @@ func (s *PromtailServer) targets(rw http.ResponseWriter, req *http.Request) { | |
}) | ||
} | ||
|
||
func (s *PromtailServer) reload(rw http.ResponseWriter, req *http.Request) { | ||
rc := make(chan error) | ||
s.reloadCh <- rc | ||
if err := <-rc; err != nil { | ||
http.Error(rw, fmt.Sprintf("failed to reload config: %s", err), http.StatusInternalServerError) | ||
} | ||
|
||
} | ||
|
||
// Reload returns the receive-only channel that signals configuration reload requests. | ||
func (s *PromtailServer) Reload() <-chan chan error { | ||
return s.reloadCh | ||
} | ||
func (s *PromtailServer) ReloadTms(tms *targets.TargetManagers) { | ||
s.mtx.Lock() | ||
defer s.mtx.Unlock() | ||
s.tms = tms | ||
} | ||
|
||
// ready serves the ready endpoint | ||
func (s *PromtailServer) ready(rw http.ResponseWriter, _ *http.Request) { | ||
s.mtx.Lock() | ||
defer s.mtx.Unlock() | ||
if s.healthCheckTarget && !s.tms.Ready() { | ||
http.Error(rw, readinessProbeFailure, http.StatusInternalServerError) | ||
return | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have deleted it.
My mistake, this is a debug variable left over.