Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[new feature] promtail: Add config reload endoint / signal to promtail #7247

Merged
merged 30 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0c7e77c
[new feature] promtail: support /reload config
liguozhong Sep 26, 2022
f7d7244
fix lint
liguozhong Sep 26, 2022
b9c89d5
lint
liguozhong Sep 26, 2022
0b7e536
lint
liguozhong Sep 26, 2022
07a00bd
lint
liguozhong Sep 26, 2022
9e82c9f
lint
liguozhong Sep 26, 2022
4bdf6cd
lint
liguozhong Sep 26, 2022
ce7ddb0
lint
liguozhong Sep 26, 2022
e01b8bd
lint,and *pointer
liguozhong Sep 27, 2022
4d6ee77
add promtail reload test
liguozhong Sep 27, 2022
69489b9
lint
liguozhong Sep 27, 2022
646b120
Update clients/pkg/promtail/promtail.go
liguozhong Sep 27, 2022
ebc0e35
fix review tip
liguozhong Sep 27, 2022
df6f15d
wrap server error
liguozhong Sep 27, 2022
f838787
do not panic
liguozhong Sep 27, 2022
3074a1e
do not panic when reload config fail.
liguozhong Sep 27, 2022
9bd469f
add reload metrics and test
liguozhong Sep 27, 2022
ab2dd37
delete cnt int
liguozhong Sep 27, 2022
6bc47e3
do not panic,add more detail log for watchConfig
liguozhong Sep 27, 2022
6651e62
do not reload when config not changed
liguozhong Sep 27, 2022
2817f60
lint
liguozhong Sep 27, 2022
a2809cd
lint
liguozhong Sep 27, 2022
b980753
lint
liguozhong Sep 27, 2022
41b584f
add test for promtailServer.configLoaded field
liguozhong Sep 28, 2022
5e2dd98
Update clients/cmd/promtail/main.go
liguozhong Oct 10, 2022
489f9cf
fix review tips
liguozhong Oct 10, 2022
5582e95
Merge branch 'main' into promtail_reload
liguozhong Oct 10, 2022
dba1290
trigger ci agent
liguozhong Oct 10, 2022
3928fff
replace errors.Wrap with std fmt.Errorf
liguozhong Oct 11, 2022
3396a6f
Merge branch 'main' into promtail_reload
liguozhong Oct 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions clients/cmd/promtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"reflect"
"sync"

// embed time zone data
_ "time/tzdata"
Expand All @@ -20,8 +21,7 @@ import (
"github.com/grafana/loki/clients/pkg/logentry/stages"
"github.com/grafana/loki/clients/pkg/promtail"
"github.com/grafana/loki/clients/pkg/promtail/client"
"github.com/grafana/loki/clients/pkg/promtail/config"

promtail_config "github.com/grafana/loki/clients/pkg/promtail/config"
"github.com/grafana/loki/pkg/util"
_ "github.com/grafana/loki/pkg/util/build"
"github.com/grafana/loki/pkg/util/cfg"
Expand All @@ -32,15 +32,19 @@ func init() {
prometheus.MustRegister(version.NewCollector("promtail"))
}

var mtx sync.Mutex

type Config struct {
config.Config `yaml:",inline"`
printVersion bool
printConfig bool
logConfig bool
dryRun bool
configFile string
configExpandEnv bool
inspect bool
promtail_config.Config `yaml:",inline"`
printVersion bool
printConfig bool
logConfig bool
dryRun bool
configFile string
configExpandEnv bool
inspect bool

cnt int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have deleted it.
My mistake, this is a debug variable left over.

}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -66,11 +70,11 @@ func (c *Config) Clone() flagext.Registerer {
func main() {
// Load config, merging config file and CLI flags
var config Config
if err := cfg.DefaultUnmarshal(&config, os.Args[1:], flag.CommandLine); err != nil {
args := os.Args[1:]
if err := cfg.DefaultUnmarshal(&config, args, flag.CommandLine); err != nil {
fmt.Println("Unable to parse config:", err)
os.Exit(1)
}

// Handle -version CLI flag
if config.printVersion {
fmt.Println(version.Print("promtail"))
Expand Down Expand Up @@ -112,7 +116,17 @@ func main() {
}

clientMetrics := client.NewMetrics(prometheus.DefaultRegisterer, config.Config.Options.StreamLagLabels)
p, err := promtail.New(config.Config, clientMetrics, config.dryRun)
newConfigFunc := func() promtail_config.Config {
mtx.Lock()
defer mtx.Unlock()
var config Config
if err := cfg.DefaultUnmarshal(&config, args, flag.NewFlagSet(os.Args[0], flag.ExitOnError)); err != nil {
fmt.Println("Unable to parse config:", err)
os.Exit(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I certainly don't think this behaviour is desired; if a runtime config cannot be reloaded, it should not kill the process.
I think we need to distinguish between the initial load and subsequent (runtime) reloads.

This will also better match how Loki works with its overrides.

}
return config.Config
}
p, err := promtail.New(config.Config, newConfigFunc, clientMetrics, config.dryRun)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error creating promtail", "error", err)
os.Exit(1)
Expand Down
105 changes: 86 additions & 19 deletions clients/pkg/promtail/promtail.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package promtail

import (
"os"
"os/signal"
"sync"
"syscall"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/clients/pkg/logentry/stages"
Expand Down Expand Up @@ -42,17 +46,24 @@ type Promtail struct {
logger log.Logger
reg prometheus.Registerer

stopped bool
mtx sync.Mutex
stopped bool
mtx sync.Mutex
configFile string
newConfig func() config.Config
metrics *client.Metrics
dryRun bool
}

// New makes a new Promtail.
func New(cfg config.Config, metrics *client.Metrics, dryRun bool, opts ...Option) (*Promtail, error) {
func New(cfg config.Config, newConfig func() config.Config, metrics *client.Metrics, dryRun bool, opts ...Option) (*Promtail, error) {
// Initialize promtail with some defaults and allow the options to override
// them.

promtail := &Promtail{
logger: util_log.Logger,
reg: prometheus.DefaultRegisterer,
logger: util_log.Logger,
reg: prometheus.DefaultRegisterer,
metrics: metrics,
dryRun: dryRun,
}
for _, o := range opts {
// todo (callum) I don't understand why I needed to add this check
Expand All @@ -61,37 +72,62 @@ func New(cfg config.Config, metrics *client.Metrics, dryRun bool, opts ...Option
}
o(promtail)
}
err := promtail.reloadConfig(cfg)
if err != nil {
return nil, err
}
server, err := server.New(cfg.ServerConfig, promtail.logger, promtail.targetManagers, cfg.String())
if err != nil {
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please wrap these errors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
promtail.server = server
promtail.newConfig = newConfig

cfg.Setup(promtail.logger)
return promtail, nil
}

func (p *Promtail) reloadConfig(cfg config.Config) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this not a pointer to config.Config? It seems like we're mutating it within this function (ie. cfg.PositionsConfig.ReadOnly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for such fast feedback,done.

level.Info(p.logger).Log("msg", "Loading configuration file")
liguozhong marked this conversation as resolved.
Show resolved Hide resolved
p.mtx.Lock()
defer p.mtx.Unlock()
if p.targetManagers != nil {
p.targetManagers.Stop()
}
if p.client != nil {
p.client.Stop()
}

cfg.Setup(p.logger)
if cfg.LimitsConfig.ReadlineRateEnabled {
stages.SetReadLineRateLimiter(cfg.LimitsConfig.ReadlineRate, cfg.LimitsConfig.ReadlineBurst, cfg.LimitsConfig.ReadlineRateDrop)
}
var err error
if dryRun {
promtail.client, err = client.NewLogger(metrics, cfg.Options.StreamLagLabels, promtail.logger, cfg.ClientConfigs...)
if p.dryRun {
p.client, err = client.NewLogger(p.metrics, cfg.Options.StreamLagLabels, p.logger, cfg.ClientConfigs...)
if err != nil {
return nil, err
return err
}
cfg.PositionsConfig.ReadOnly = true
} else {
promtail.client, err = client.NewMulti(metrics, cfg.Options.StreamLagLabels, promtail.logger, cfg.LimitsConfig.MaxStreams, cfg.ClientConfigs...)
p.client, err = client.NewMulti(p.metrics, cfg.Options.StreamLagLabels, p.logger, cfg.LimitsConfig.MaxStreams, cfg.ClientConfigs...)
if err != nil {
return nil, err
return err
}
}

tms, err := targets.NewTargetManagers(promtail, promtail.reg, promtail.logger, cfg.PositionsConfig, promtail.client, cfg.ScrapeConfig, &cfg.TargetConfig)
tms, err := targets.NewTargetManagers(p, p.reg, p.logger, cfg.PositionsConfig, p.client, cfg.ScrapeConfig, &cfg.TargetConfig)
if err != nil {
return nil, err
return err
}
promtail.targetManagers = tms
server, err := server.New(cfg.ServerConfig, promtail.logger, tms, cfg.String())
if err != nil {
return nil, err
p.targetManagers = tms

promServer := p.server
if promServer != nil {
promtailServer := promServer.(*server.PromtailServer)
promtailServer.ReloadTms(p.targetManagers)
}
promtail.server = server
return promtail, nil

return nil
}

// Run the promtail; will block until a signal is received.
Expand All @@ -103,6 +139,7 @@ func (p *Promtail) Run() error {
return nil
}
p.mtx.Unlock() // unlock before blocking
go p.watchConfig()
return p.server.Run()
}

Expand Down Expand Up @@ -133,3 +170,33 @@ func (p *Promtail) Shutdown() {
func (p *Promtail) ActiveTargets() map[string][]target.Target {
return p.targetManagers.ActiveTargets()
}

func (p *Promtail) watchConfig() {
// Reload handler.
// Make sure that sighup handler is registered with a redirect to the channel before the potentially
if p.newConfig == nil {
level.Warn(p.logger).Log("msg", "disable watchConfig")
return
}
level.Warn(p.logger).Log("msg", "enable watchConfig")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These log messages should be more clear, please.

hup := make(chan os.Signal, 1)
signal.Notify(hup, syscall.SIGHUP)
promtailServer := p.server.(*server.PromtailServer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, please add a check if the type is not expected

for {
select {
case <-hup:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, not super familiar with these live reload patterns, can you explain the SIGHUP flow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refer to the reload documentation of prometheus, we should also provide this reload way.

https://prometheus.io/docs/prometheus/latest/configuration/configuration/
A configuration reload is triggered by sending a SIGHUP.
image

cfg := p.newConfig()
if err := p.reloadConfig(cfg); err != nil {
level.Error(p.logger).Log("msg", "Error reloading config", "err", err)
}
case rc := <-promtailServer.Reload():
cfg := p.newConfig()
if err := p.reloadConfig(cfg); err != nil {
level.Error(p.logger).Log("msg", "Error reloading config", "err", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer if we centralised this logic rather than repeating it

rc <- err
} else {
rc <- nil
}
}
}
}
8 changes: 4 additions & 4 deletions clients/pkg/promtail/promtail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestPromtail(t *testing.T) {
_ = server.Shutdown(context.Background())
}()

p, err := New(buildTestConfig(t, positionsFileName, testDir), clientMetrics, false, nil)
p, err := New(buildTestConfig(t, positionsFileName, testDir), nil, clientMetrics, false, nil)
if err != nil {
t.Error("error creating promtail", err)
return
Expand Down Expand Up @@ -659,7 +659,7 @@ func Test_DryRun(t *testing.T) {
require.NoError(t, err)
defer os.Remove(f.Name())

_, err = New(config.Config{}, clientMetrics, true, nil)
_, err = New(config.Config{}, nil, clientMetrics, true, nil)
require.Error(t, err)

// Set the minimum config needed to start a server. We need to do this since we
Expand All @@ -681,7 +681,7 @@ func Test_DryRun(t *testing.T) {
PositionsFile: f.Name(),
SyncPeriod: time.Second,
},
}, clientMetrics, true, nil)
}, nil, clientMetrics, true, nil)
require.NoError(t, err)

prometheus.DefaultRegisterer = prometheus.NewRegistry()
Expand All @@ -693,7 +693,7 @@ func Test_DryRun(t *testing.T) {
PositionsFile: f.Name(),
SyncPeriod: time.Second,
},
}, clientMetrics, false, nil)
}, nil, clientMetrics, false, nil)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want a test that this new configReload function is called? Not sure if we need to go so far as the whole config gets reloaded, but maybe at least the function is called, and test the flow where watch config is disabled when the function is not provided?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.
hi, your review suggestion is great, through this test I found a bug that "PromtailServer.promtailCfg" was not updated correctly.

require.NoError(t, err)
require.IsType(t, &client.MultiClient{}, p.client)
}
35 changes: 35 additions & 0 deletions clients/pkg/promtail/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path"
"sort"
"strings"
"sync"
"syscall"
"text/template"

Expand Down Expand Up @@ -39,8 +40,10 @@ type Server interface {
type PromtailServer struct {
*serverww.Server
log log.Logger
mtx sync.Mutex
tms *targets.TargetManagers
externalURL *url.URL
reloadCh chan chan error
healthCheckTarget bool
promtailCfg string
}
Expand All @@ -51,6 +54,8 @@ type Config struct {
ExternalURL string `yaml:"external_url"`
HealthCheckTarget *bool `yaml:"health_check_target"`
Disable bool `yaml:"disable"`
Reload bool `yaml:"reload"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider naming this more clearly.

I would suggest enable_runtime_reload

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

NewByReload bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is redundant, invalid variables left over from my development process

}

// RegisterFlags with prefix registers flags where every name is prefixed by
Expand All @@ -60,6 +65,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.Config.RegisterFlags(f)

f.BoolVar(&cfg.Disable, prefix+"server.disable", false, "Disable the http and grpc server.")
f.BoolVar(&cfg.Reload, prefix+"server.reload", false, "Enable reload via HTTP request.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can also be reloaded via SIGHUP, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I know, the SIGHUP way in the reload of prometheus cannot be disable.
I suggest we should keep the same behavior as prometheus.

}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand Down Expand Up @@ -91,6 +97,7 @@ func New(cfg Config, log log.Logger, tms *targets.TargetManagers, promtailCfg st
serv := &PromtailServer{
Server: wws,
log: log,
reloadCh: make(chan chan error),
tms: tms,
externalURL: externalURL,
healthCheckTarget: healthCheckTargetFlag,
Expand All @@ -103,12 +110,17 @@ func New(cfg Config, log log.Logger, tms *targets.TargetManagers, promtailCfg st
serv.HTTP.Path("/service-discovery").Handler(http.HandlerFunc(serv.serviceDiscovery))
serv.HTTP.Path("/targets").Handler(http.HandlerFunc(serv.targets))
serv.HTTP.Path("/config").Handler(http.HandlerFunc(serv.config))
if cfg.Reload {
serv.HTTP.Path("/reload").Handler(http.HandlerFunc(serv.reload))
}
serv.HTTP.Path("/debug/fgprof").Handler(fgprof.Handler())
return serv, nil
}

// serviceDiscovery serves the service discovery page.
func (s *PromtailServer) serviceDiscovery(rw http.ResponseWriter, req *http.Request) {
s.mtx.Lock()
defer s.mtx.Unlock()
var index []string
allTarget := s.tms.AllTargets()
for job := range allTarget {
Expand Down Expand Up @@ -187,6 +199,8 @@ func (s *PromtailServer) config(rw http.ResponseWriter, req *http.Request) {

// targets serves the targets page.
func (s *PromtailServer) targets(rw http.ResponseWriter, req *http.Request) {
s.mtx.Lock()
defer s.mtx.Unlock()
executeTemplate(req.Context(), rw, templateOptions{
Data: struct {
TargetPools map[string][]target.Target
Expand Down Expand Up @@ -218,8 +232,29 @@ func (s *PromtailServer) targets(rw http.ResponseWriter, req *http.Request) {
})
}

func (s *PromtailServer) reload(rw http.ResponseWriter, req *http.Request) {
rc := make(chan error)
s.reloadCh <- rc
if err := <-rc; err != nil {
http.Error(rw, fmt.Sprintf("failed to reload config: %s", err), http.StatusInternalServerError)
}

}

// Reload returns the receive-only channel that signals configuration reload requests.
func (s *PromtailServer) Reload() <-chan chan error {
return s.reloadCh
}
func (s *PromtailServer) ReloadTms(tms *targets.TargetManagers) {
s.mtx.Lock()
defer s.mtx.Unlock()
s.tms = tms
}

// ready serves the ready endpoint
func (s *PromtailServer) ready(rw http.ResponseWriter, _ *http.Request) {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.healthCheckTarget && !s.tms.Ready() {
http.Error(rw, readinessProbeFailure, http.StatusInternalServerError)
return
Expand Down
Loading