Skip to content

Commit

Permalink
cmd/thanos/query: add initial rules support
Browse files Browse the repository at this point in the history
Signed-off-by: Sergiusz Urbaniak <sergiusz.urbaniak@gmail.com>
  • Loading branch information
s-urbaniak committed Apr 21, 2020
1 parent 932f2ee commit 63f2a14
Show file tree
Hide file tree
Showing 12 changed files with 587 additions and 40 deletions.
72 changes: 54 additions & 18 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
maxConcurrentQueries := cmd.Flag("query.max-concurrent", "Maximum number of queries processed concurrently by query node.").
Default("20").Int()

replicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter.").
queryReplicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter.").
Strings()

instantDefaultMaxSourceResolution := modelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())
Expand All @@ -76,6 +76,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
stores := cmd.Flag("store", "Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups.").
PlaceHolder("<store>").Strings()

rules := cmd.Flag("rule", "Addresses of statically configured rules API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups.").
PlaceHolder("<rule>").Strings()

fileSDFiles := cmd.Flag("store.sd-files", "Path to files that contain addresses of store API servers. The path can be a glob pattern (repeatable).").
PlaceHolder("<path>").Strings()

Expand Down Expand Up @@ -107,13 +110,12 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
return errors.Wrap(err, "parse federation labels")
}

lookupStores := map[string]struct{}{}
for _, s := range *stores {
if _, ok := lookupStores[s]; ok {
return errors.Errorf("Address %s is duplicated for --store flag.", s)
}
if dup := duplicate(*stores); dup != "" {
return errors.Errorf("Address %s is duplicated for --store flag.", dup)
}

lookupStores[s] = struct{}{}
if dup := duplicate(*rules); dup != "" {
return errors.Errorf("Address %s is duplicated for --rule flag.", dup)
}

var fileSD *file.Discovery
Expand Down Expand Up @@ -150,9 +152,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
*maxConcurrentQueries,
time.Duration(*queryTimeout),
time.Duration(*storeResponseTimeout),
*replicaLabels,
*queryReplicaLabels,
selectorLset,
*stores,
*stores, *rules,
*enableAutodownsampling,
*enablePartialResponse,
fileSD,
Expand Down Expand Up @@ -190,9 +192,9 @@ func runQuery(
maxConcurrentQueries int,
queryTimeout time.Duration,
storeResponseTimeout time.Duration,
replicaLabels []string,
queryReplicaLabels []string,
selectorLset labels.Labels,
storeAddrs []string,
storeAddrs []string, ruleAddrs []string,
enableAutodownsampling bool,
enablePartialResponse bool,
fileSD *file.Discovery,
Expand All @@ -215,30 +217,46 @@ func runQuery(
}

fileSDCache := cache.New()
dnsProvider := dns.NewProvider(
dnsStoreProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_querier_store_apis_", reg),
dns.ResolverType(dnsSDResolver),
)

dnsRuleProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_querier_rule_apis_", reg),
dns.ResolverType(dnsSDResolver),
)

var (
stores = query.NewStoreSet(
logger,
reg,
func() (specs []query.StoreSpec) {
// Add DNS resolved addresses from static flags and file SD.
for _, addr := range dnsProvider.Addresses() {
for _, addr := range dnsStoreProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr))
}

specs = removeDuplicateStoreSpecs(logger, duplicatedStores, specs)

return specs
},
func() (specs []query.RuleSpec) {
for _, addr := range dnsRuleProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr))
}

// NOTE(s-urbaniak): No need to remove duplicates, as rule apis are a subset of store apis.
// hence, any duplicates will be tracked in the store api set.

return specs
},
dialOpts,
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
proxy = store.NewProxyStore(logger, reg, stores.Get, stores.GetRulesClients, component.Query, selectorLset, storeResponseTimeout)
queryableCreator = query.NewQueryableCreator(logger, proxy)
engine = promql.NewEngine(
promql.EngineOpts{
Expand Down Expand Up @@ -289,7 +307,8 @@ func runQuery(
}
fileSDCache.Update(update)
stores.Update(ctxUpdate)
dnsProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...))
dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...))
// Rules apis do not support file service discovery as of now.
case <-ctxUpdate.Done():
return nil
}
Expand All @@ -304,7 +323,8 @@ func runQuery(
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(dnsSDInterval, ctx.Done(), func() error {
dnsProvider.Resolve(ctx, append(fileSDCache.Addresses(), storeAddrs...))
dnsStoreProvider.Resolve(ctx, append(fileSDCache.Addresses(), storeAddrs...))
dnsRuleProvider.Resolve(ctx, ruleAddrs)
return nil
})
}, func(error) {
Expand Down Expand Up @@ -340,7 +360,7 @@ func runQuery(
ins := extpromhttp.NewInstrumentationMiddleware(reg)
ui.NewQueryUI(logger, reg, stores, flagsMap).Register(router.WithPrefix(webRoutePrefix), ins)

api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse, replicaLabels, instantDefaultMaxSourceResolution)
api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse, queryReplicaLabels, instantDefaultMaxSourceResolution, nil)

api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)

Expand Down Expand Up @@ -369,7 +389,7 @@ func runQuery(
}

// TODO: Add rules API implementation when ready.
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy, nil,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy, proxy,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down Expand Up @@ -404,3 +424,19 @@ func removeDuplicateStoreSpecs(logger log.Logger, duplicatedStores prometheus.Co
}
return deduplicated
}

// Duplicate returns a duplicate string in the given string slice
// or empty string if none was found.
func duplicate(ss []string) string {
set := map[string]struct{}{}

for _, s := range []string(ss) {
if _, ok := set[s]; ok {
return s
}

set[s] = struct{}{}
}

return ""
}
5 changes: 5 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ Flags:
prefixed with 'dns+' or 'dnssrv+' to detect
store API servers through respective DNS
lookups.
--rule=<rule> ... Addresses of statically configured rules API
servers (repeatable). The scheme may be
prefixed with 'dns+' or 'dnssrv+' to detect
store API servers through respective DNS
lookups.
--store.sd-files=<path> ...
Path to files that contain addresses of store
API servers. The path can be a glob pattern
Expand Down
15 changes: 15 additions & 0 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"strconv"
"time"

"github.com/prometheus/prometheus/rules"

"github.com/NYTimes/gziphandler"
"github.com/go-kit/kit/log"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -96,12 +98,17 @@ func SetCORS(w http.ResponseWriter) {

type ApiFunc func(r *http.Request) (interface{}, []error, *ApiError)

type rulesRetriever interface {
RuleGroups() ([]*rules.Group, error)
}

// API can register a set of endpoints in a router and handle
// them using the provided storage and query engine.
type API struct {
logger log.Logger
queryableCreate query.QueryableCreator
queryEngine *promql.Engine
rulesRetriever rulesRetriever

enableAutodownsampling bool
enablePartialResponse bool
Expand All @@ -122,6 +129,7 @@ func NewAPI(
enablePartialResponse bool,
replicaLabels []string,
defaultInstantQueryMaxSourceResolution time.Duration,
rr rulesRetriever,
) *API {
return &API{
logger: logger,
Expand All @@ -132,6 +140,7 @@ func NewAPI(
replicaLabels: replicaLabels,
reg: reg,
defaultInstantQueryMaxSourceResolution: defaultInstantQueryMaxSourceResolution,
rulesRetriever: rr,

now: time.Now,
}
Expand Down Expand Up @@ -168,6 +177,8 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log.

r.Get("/labels", instr("label_names", api.labelNames))
r.Post("/labels", instr("label_names", api.labelNames))

r.Get("/rules", instr("rules", api.rules))
}

type queryData struct {
Expand Down Expand Up @@ -626,3 +637,7 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) {

return names, warnings, nil
}

func (api *API) rules(r *http.Request) (interface{}, []error, *ApiError) {
panic("implement me")
}
23 changes: 23 additions & 0 deletions pkg/query/rules.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package query

import (
"github.com/prometheus/prometheus/rules"
"github.com/thanos-io/thanos/pkg/store/storepb"
)

func NewRulesRetriever(rs storepb.RulesServer) *rulesRetriever {
return &rulesRetriever{
rulesServer: rs,
}
}

type rulesRetriever struct {
rulesServer storepb.RulesServer
}

func (rr *rulesRetriever) RuleGroups() ([]*rules.Group, error) {
return nil, nil
}
Loading

0 comments on commit 63f2a14

Please sign in to comment.