-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement stateless mode for Thanos Ruler (continue #4250) #4731
Changes from 25 commits
a25e730
6a14fc5
021cb36
fd19217
5814523
7acfa2e
2d7d6d9
c2f999c
31a75f2
30b081c
d567faf
6fb7d3b
62ef47c
2054971
3d5413d
27f4984
1c82a78
760be92
45e9f99
1eb0fe3
3b65c42
69bb9b8
9ec5690
d0bdcd5
fb0dfba
46acaee
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ import ( | |
"strings" | ||
"time" | ||
|
||
extflag "github.com/efficientgo/tools/extkingpin" | ||
"github.com/go-kit/kit/log" | ||
"github.com/go-kit/kit/log/level" | ||
grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" | ||
|
@@ -26,17 +27,21 @@ import ( | |
"github.com/prometheus/client_golang/prometheus/promauto" | ||
"github.com/prometheus/common/model" | ||
"github.com/prometheus/common/route" | ||
"github.com/prometheus/prometheus/config" | ||
"github.com/prometheus/prometheus/pkg/labels" | ||
"github.com/prometheus/prometheus/pkg/relabel" | ||
"github.com/prometheus/prometheus/promql" | ||
"github.com/prometheus/prometheus/rules" | ||
"github.com/prometheus/prometheus/storage" | ||
"github.com/prometheus/prometheus/storage/remote" | ||
"github.com/prometheus/prometheus/tsdb" | ||
"github.com/prometheus/prometheus/tsdb/agent" | ||
"github.com/prometheus/prometheus/util/strutil" | ||
"github.com/thanos-io/thanos/pkg/errutil" | ||
"github.com/thanos-io/thanos/pkg/extkingpin" | ||
"github.com/thanos-io/thanos/pkg/httpconfig" | ||
"gopkg.in/yaml.v2" | ||
|
||
extflag "github.com/efficientgo/tools/extkingpin" | ||
"github.com/thanos-io/thanos/pkg/alert" | ||
v1 "github.com/thanos-io/thanos/pkg/api/rule" | ||
"github.com/thanos-io/thanos/pkg/block/metadata" | ||
|
@@ -74,6 +79,8 @@ type ruleConfig struct { | |
alertQueryURL *url.URL | ||
alertRelabelConfigYAML []byte | ||
|
||
rwConfig *extflag.PathOrContent | ||
|
||
resendDelay time.Duration | ||
evalInterval time.Duration | ||
ruleFiles []string | ||
|
@@ -116,6 +123,8 @@ func registerRule(app *extkingpin.App) { | |
cmd.Flag("eval-interval", "The default evaluation interval to use."). | ||
Default("30s").DurationVar(&conf.evalInterval) | ||
|
||
conf.rwConfig = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write server where samples should be sent to (see https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). This automatically enables stateless mode for ruler and no series will be stored in the ruler's TSDB. If an empty config (or file) is provided, the flag is ignored and ruler is run with its own TSDB.", extflag.WithEnvSubstitution()) | ||
|
||
reqLogDecision := cmd.Flag("log.request.decision", "Deprecation Warning - This flag would be soon deprecated, and replaced with `request.logging-config`. Request Logging for logging the start and end of requests. By default this flag is disabled. LogFinishCall: Logs the finish call of the requests. LogStartAndFinishCall: Logs the start and finish call of the requests. NoLogCall: Disable request logging.").Default("").Enum("NoLogCall", "LogFinishCall", "LogStartAndFinishCall", "") | ||
|
||
conf.objStoreConfig = extkingpin.RegisterCommonObjStoreFlags(cmd, "", false) | ||
|
@@ -142,6 +151,10 @@ func registerRule(app *extkingpin.App) { | |
WALCompression: *walCompression, | ||
} | ||
|
||
agentOpts := &agent.Options{ | ||
WALCompression: *walCompression, | ||
} | ||
|
||
// Parse and check query configuration. | ||
lookupQueries := map[string]struct{}{} | ||
for _, q := range conf.query.addrs { | ||
|
@@ -199,6 +212,7 @@ func registerRule(app *extkingpin.App) { | |
grpcLogOpts, | ||
tagOpts, | ||
tsdbOpts, | ||
agentOpts, | ||
) | ||
}) | ||
} | ||
|
@@ -262,6 +276,7 @@ func runRule( | |
grpcLogOpts []grpc_logging.Option, | ||
tagOpts []tags.Option, | ||
tsdbOpts *tsdb.Options, | ||
agentOpts *agent.Options, | ||
) error { | ||
metrics := newRuleMetrics(reg) | ||
|
||
|
@@ -318,25 +333,63 @@ func runRule( | |
// Discover and resolve query addresses. | ||
addDiscoveryGroups(g, queryClient, conf.query.dnsSDInterval) | ||
} | ||
var ( | ||
appendable storage.Appendable | ||
queryable storage.Queryable | ||
db *tsdb.DB | ||
) | ||
|
||
db, err := tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil) | ||
rwCfgYAML, err := conf.rwConfig.Content() | ||
if err != nil { | ||
return errors.Wrap(err, "open TSDB") | ||
return err | ||
} | ||
|
||
level.Debug(logger).Log("msg", "removing storage lock file if any") | ||
if err := removeLockfileIfAny(logger, conf.dataDir); err != nil { | ||
return errors.Wrap(err, "remove storage lock files") | ||
} | ||
if len(rwCfgYAML) > 0 { | ||
var rwCfg config.RemoteWriteConfig | ||
if err := yaml.Unmarshal(rwCfgYAML, &rwCfg); err != nil { | ||
return err | ||
} | ||
walDir := filepath.Join(conf.dataDir, rwCfg.Name) | ||
// flushDeadline is set to 1m, but it is for metadata watcher only so not used here. | ||
remoteStore := remote.NewStorage(logger, reg, func() (int64, error) { | ||
return 0, nil | ||
}, walDir, 1*time.Minute, nil) | ||
if err := remoteStore.ApplyConfig(&config.Config{ | ||
GlobalConfig: config.DefaultGlobalConfig, | ||
RemoteWriteConfigs: []*config.RemoteWriteConfig{&rwCfg}, | ||
}); err != nil { | ||
return errors.Wrap(err, "applying config to remote storage") | ||
} | ||
|
||
{ | ||
done := make(chan struct{}) | ||
g.Add(func() error { | ||
<-done | ||
return db.Close() | ||
}, func(error) { | ||
close(done) | ||
}) | ||
db, err := agent.Open(logger, reg, remoteStore, walDir, agentOpts) | ||
if err != nil { | ||
return errors.Wrap(err, "start remote write agent db") | ||
} | ||
fanoutStore := storage.NewFanout(logger, db, remoteStore) | ||
appendable = fanoutStore | ||
queryable = fanoutStore | ||
} else { | ||
db, err = tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil) | ||
if err != nil { | ||
return errors.Wrap(err, "open TSDB") | ||
} | ||
|
||
level.Debug(logger).Log("msg", "removing storage lock file if any") | ||
if err := removeLockfileIfAny(logger, conf.dataDir); err != nil { | ||
return errors.Wrap(err, "remove storage lock files") | ||
} | ||
|
||
{ | ||
done := make(chan struct{}) | ||
g.Add(func() error { | ||
<-done | ||
return db.Close() | ||
}, func(error) { | ||
close(done) | ||
}) | ||
} | ||
appendable = db | ||
queryable = db | ||
} | ||
|
||
// Build the Alertmanager clients. | ||
|
@@ -434,9 +487,9 @@ func runRule( | |
rules.ManagerOptions{ | ||
NotifyFunc: notifyFunc, | ||
Logger: logger, | ||
Appendable: db, | ||
Appendable: appendable, | ||
ExternalURL: nil, | ||
Queryable: db, | ||
Queryable: queryable, | ||
ResendDelay: conf.resendDelay, | ||
}, | ||
queryFuncCreator(logger, queryClients, metrics.duplicatedQuery, metrics.ruleEvalWarnings, conf.query.httpMethod), | ||
|
@@ -521,31 +574,32 @@ func runRule( | |
) | ||
|
||
// Start gRPC server. | ||
{ | ||
tsdbStore := store.NewTSDBStore(logger, db, component.Rule, conf.lset) | ||
tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpc.tlsSrvCert, conf.grpc.tlsSrvKey, conf.grpc.tlsSrvClientCA) | ||
if err != nil { | ||
return errors.Wrap(err, "setup gRPC server") | ||
} | ||
|
||
tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpc.tlsSrvCert, conf.grpc.tlsSrvKey, conf.grpc.tlsSrvClientCA) | ||
if err != nil { | ||
return errors.Wrap(err, "setup gRPC server") | ||
} | ||
options := []grpcserver.Option{ | ||
grpcserver.WithServer(thanosrules.RegisterRulesServer(ruleMgr)), | ||
grpcserver.WithListen(conf.grpc.bindAddress), | ||
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)), | ||
grpcserver.WithTLSConfig(tlsCfg), | ||
} | ||
if db != nil { | ||
tsdbStore := store.NewTSDBStore(logger, db, component.Rule, conf.lset) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the future maybe we could also answer StoreAPI queries by using the data in the WAL? Perhaps worth adding a TODO here 😄 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure about this. If the agent is able to query head data, then what's the difference between the agent and a prometheus with very low retention? I am not sure whether this is in their roadmap. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair point 👍 |
||
options = append(options, grpcserver.WithServer(store.RegisterStoreServer(tsdbStore))) | ||
} | ||
// TODO: Add rules API implementation when ready. | ||
yeya24 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, options...) | ||
|
||
// TODO: Add rules API implementation when ready. | ||
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, | ||
grpcserver.WithServer(store.RegisterStoreServer(tsdbStore)), | ||
grpcserver.WithServer(thanosrules.RegisterRulesServer(ruleMgr)), | ||
grpcserver.WithListen(conf.grpc.bindAddress), | ||
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)), | ||
grpcserver.WithTLSConfig(tlsCfg), | ||
) | ||
g.Add(func() error { | ||
statusProber.Ready() | ||
return s.ListenAndServe() | ||
}, func(err error) { | ||
statusProber.NotReady(err) | ||
s.Shutdown(err) | ||
}) | ||
|
||
g.Add(func() error { | ||
statusProber.Ready() | ||
return s.ListenAndServe() | ||
}, func(err error) { | ||
statusProber.NotReady(err) | ||
s.Shutdown(err) | ||
}) | ||
} | ||
// Start UI & metrics HTTP server. | ||
{ | ||
router := route.New() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we please call this something other than
db
to avoid shadowing the outer variable? This is confusing and I had to double check this place 😂 I suggestagentDB
or something like that, and to rename the outerdb
variable intotsdbDB
.