Skip to content

Commit

Permalink
[new feature] promtail: Add config reload endoint / signal to promtail (
Browse files Browse the repository at this point in the history
  • Loading branch information
liguozhong authored and Abuelodelanada committed Dec 1, 2022
1 parent 9822256 commit 7b2e57a
Show file tree
Hide file tree
Showing 5 changed files with 390 additions and 56 deletions.
42 changes: 28 additions & 14 deletions clients/cmd/promtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"reflect"
"sync"

// embed time zone data
_ "time/tzdata"
Expand All @@ -20,28 +21,31 @@ import (
"github.com/grafana/loki/clients/pkg/logentry/stages"
"github.com/grafana/loki/clients/pkg/promtail"
"github.com/grafana/loki/clients/pkg/promtail/client"
"github.com/grafana/loki/clients/pkg/promtail/config"
promtail_config "github.com/grafana/loki/clients/pkg/promtail/config"

"github.com/grafana/loki/pkg/util"
_ "github.com/grafana/loki/pkg/util/build"
"github.com/grafana/loki/pkg/util/cfg"

_ "github.com/grafana/loki/pkg/util/build"
util_log "github.com/grafana/loki/pkg/util/log"
)

func init() {
prometheus.MustRegister(version.NewCollector("promtail"))
}

var mtx sync.Mutex

type Config struct {
config.Config `yaml:",inline"`
printVersion bool
printConfig bool
logConfig bool
dryRun bool
checkSyntax bool
configFile string
configExpandEnv bool
inspect bool
promtail_config.Config `yaml:",inline"`
printVersion bool
printConfig bool
logConfig bool
dryRun bool
checkSyntax bool
configFile string
configExpandEnv bool
inspect bool
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -68,11 +72,11 @@ func (c *Config) Clone() flagext.Registerer {
func main() {
// Load config, merging config file and CLI flags
var config Config
if err := cfg.DefaultUnmarshal(&config, os.Args[1:], flag.CommandLine); err != nil {
args := os.Args[1:]
if err := cfg.DefaultUnmarshal(&config, args, flag.CommandLine); err != nil {
fmt.Println("Unable to parse config:", err)
os.Exit(1)
}

if config.checkSyntax {
if config.configFile == "" {
fmt.Println("Invalid config file")
Expand Down Expand Up @@ -123,7 +127,17 @@ func main() {
}

clientMetrics := client.NewMetrics(prometheus.DefaultRegisterer, config.Config.Options.StreamLagLabels)
p, err := promtail.New(config.Config, clientMetrics, config.dryRun)
newConfigFunc := func() (*promtail_config.Config, error) {
mtx.Lock()
defer mtx.Unlock()
var config Config
if err := cfg.DefaultUnmarshal(&config, args, flag.NewFlagSet(os.Args[0], flag.ExitOnError)); err != nil {
fmt.Println("Unable to parse config:", err)
return nil, fmt.Errorf("unable to parse config: %w", err)
}
return &config.Config, nil
}
p, err := promtail.New(config.Config, newConfigFunc, clientMetrics, config.dryRun)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error creating promtail", "error", err)
os.Exit(1)
Expand Down
153 changes: 134 additions & 19 deletions clients/pkg/promtail/promtail.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package promtail

import (
"errors"
"fmt"
"os"
"os/signal"
"sync"
"syscall"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/clients/pkg/logentry/stages"
Expand All @@ -16,6 +22,20 @@ import (
util_log "github.com/grafana/loki/pkg/util/log"
)

var reloadSuccessTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "promtail",
Name: "config_reload_success_total",
Help: "Number of reload success times.",
})

var reloadFailTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "promtail",
Name: "config_reload_fail_total",
Help: "Number of reload fail times.",
})

var errConfigNotChange = errors.New("config has not changed")

// Option is a function that can be passed to the New method of Promtail and
// customize the Promtail that is created.
type Option func(p *Promtail)
Expand All @@ -42,17 +62,32 @@ type Promtail struct {
logger log.Logger
reg prometheus.Registerer

stopped bool
mtx sync.Mutex
stopped bool
mtx sync.Mutex
configLoaded string
newConfig func() (*config.Config, error)
metrics *client.Metrics
dryRun bool
}

// New makes a new Promtail.
func New(cfg config.Config, metrics *client.Metrics, dryRun bool, opts ...Option) (*Promtail, error) {
func New(cfg config.Config, newConfig func() (*config.Config, error), metrics *client.Metrics, dryRun bool, opts ...Option) (*Promtail, error) {
// Initialize promtail with some defaults and allow the options to override
// them.

promtail := &Promtail{
logger: util_log.Logger,
reg: prometheus.DefaultRegisterer,
logger: util_log.Logger,
reg: prometheus.DefaultRegisterer,
metrics: metrics,
dryRun: dryRun,
}
err := promtail.reg.Register(reloadSuccessTotal)
if err != nil {
return nil, fmt.Errorf("error register prometheus collector reloadSuccessTotal :%w", err)
}
err = promtail.reg.Register(reloadFailTotal)
if err != nil {
return nil, fmt.Errorf("error register prometheus collector reloadFailTotal :%w", err)
}
for _, o := range opts {
// todo (callum) I don't understand why I needed to add this check
Expand All @@ -61,37 +96,71 @@ func New(cfg config.Config, metrics *client.Metrics, dryRun bool, opts ...Option
}
o(promtail)
}
err = promtail.reloadConfig(&cfg)
if err != nil {
return nil, err
}
server, err := server.New(cfg.ServerConfig, promtail.logger, promtail.targetManagers, cfg.String())
if err != nil {
return nil, fmt.Errorf("error creating loki server: %w", err)
}
promtail.server = server
promtail.newConfig = newConfig

cfg.Setup(promtail.logger)
return promtail, nil
}

func (p *Promtail) reloadConfig(cfg *config.Config) error {
level.Debug(p.logger).Log("msg", "Reloading configuration file")
p.mtx.Lock()
defer p.mtx.Unlock()
newConfigFile := cfg.String()
if newConfigFile == p.configLoaded {
return errConfigNotChange
}
newConf := cfg.String()
level.Info(p.logger).Log("msg", "Reloading configuration file", "newConf", newConf)
if p.targetManagers != nil {
p.targetManagers.Stop()
}
if p.client != nil {
p.client.Stop()
}

cfg.Setup(p.logger)
if cfg.LimitsConfig.ReadlineRateEnabled {
stages.SetReadLineRateLimiter(cfg.LimitsConfig.ReadlineRate, cfg.LimitsConfig.ReadlineBurst, cfg.LimitsConfig.ReadlineRateDrop)
}
var err error
if dryRun {
promtail.client, err = client.NewLogger(metrics, cfg.Options.StreamLagLabels, promtail.logger, cfg.ClientConfigs...)
if p.dryRun {
p.client, err = client.NewLogger(p.metrics, cfg.Options.StreamLagLabels, p.logger, cfg.ClientConfigs...)
if err != nil {
return nil, err
return err
}
cfg.PositionsConfig.ReadOnly = true
} else {
promtail.client, err = client.NewMulti(metrics, cfg.Options.StreamLagLabels, promtail.logger, cfg.LimitsConfig.MaxStreams, cfg.ClientConfigs...)
p.client, err = client.NewMulti(p.metrics, cfg.Options.StreamLagLabels, p.logger, cfg.LimitsConfig.MaxStreams, cfg.ClientConfigs...)
if err != nil {
return nil, err
return err
}
}

tms, err := targets.NewTargetManagers(promtail, promtail.reg, promtail.logger, cfg.PositionsConfig, promtail.client, cfg.ScrapeConfig, &cfg.TargetConfig)
tms, err := targets.NewTargetManagers(p, p.reg, p.logger, cfg.PositionsConfig, p.client, cfg.ScrapeConfig, &cfg.TargetConfig)
if err != nil {
return nil, err
return err
}
promtail.targetManagers = tms
server, err := server.New(cfg.ServerConfig, promtail.logger, tms, cfg.String())
if err != nil {
return nil, err
p.targetManagers = tms

promServer := p.server
if promServer != nil {
promtailServer, ok := promServer.(*server.PromtailServer)
if !ok {
return errors.New("promtailServer cast fail")
}
promtailServer.ReloadServer(p.targetManagers, cfg.String())
}
promtail.server = server
return promtail, nil
p.configLoaded = newConf
return nil
}

// Run the promtail; will block until a signal is received.
Expand All @@ -103,6 +172,7 @@ func (p *Promtail) Run() error {
return nil
}
p.mtx.Unlock() // unlock before blocking
go p.watchConfig()
return p.server.Run()
}

Expand Down Expand Up @@ -133,3 +203,48 @@ func (p *Promtail) Shutdown() {
func (p *Promtail) ActiveTargets() map[string][]target.Target {
return p.targetManagers.ActiveTargets()
}

func (p *Promtail) watchConfig() {
// Reload handler.
// Make sure that sighup handler is registered with a redirect to the channel before the potentially
if p.newConfig == nil {
level.Warn(p.logger).Log("msg", "disable watchConfig", "reason", "Promtail newConfig func is Empty")
return
}
promtailServer, ok := p.server.(*server.PromtailServer)
if !ok {
level.Warn(p.logger).Log("msg", "disable watchConfig", "reason", "promtailServer cast fail")
return
}
level.Warn(p.logger).Log("msg", "enable watchConfig")
hup := make(chan os.Signal, 1)
signal.Notify(hup, syscall.SIGHUP)
for {
select {
case <-hup:
_ = p.reload()
case rc := <-promtailServer.Reload():
if err := p.reload(); err != nil {
rc <- err
} else {
rc <- nil
}
}
}
}

func (p *Promtail) reload() error {
cfg, err := p.newConfig()
if err != nil {
reloadFailTotal.Inc()
return fmt.Errorf("Error new Config: %w", err)
}
err = p.reloadConfig(cfg)
if err != nil {
reloadFailTotal.Inc()
level.Error(p.logger).Log("msg", "Error reloading config", "err", err)
return err
}
reloadSuccessTotal.Inc()
return nil
}
Loading

0 comments on commit 7b2e57a

Please sign in to comment.