Skip to content

Commit

Permalink
cmd/thanos/query: add initial rules support
Browse files Browse the repository at this point in the history
Signed-off-by: Sergiusz Urbaniak <sergiusz.urbaniak@gmail.com>
  • Loading branch information
s-urbaniak committed Mar 30, 2020
1 parent 932f2ee commit 07a96e2
Show file tree
Hide file tree
Showing 12 changed files with 682 additions and 100 deletions.
64 changes: 50 additions & 14 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
stores := cmd.Flag("store", "Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups.").
PlaceHolder("<store>").Strings()

rules := cmd.Flag("rule", "Addresses of statically configured rules API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups.").
PlaceHolder("<rule>").Strings()

fileSDFiles := cmd.Flag("store.sd-files", "Path to files that contain addresses of store API servers. The path can be a glob pattern (repeatable).").
PlaceHolder("<path>").Strings()

Expand Down Expand Up @@ -107,13 +110,12 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
return errors.Wrap(err, "parse federation labels")
}

lookupStores := map[string]struct{}{}
for _, s := range *stores {
if _, ok := lookupStores[s]; ok {
return errors.Errorf("Address %s is duplicated for --store flag.", s)
}
if dup := duplicate(*stores); dup != "" {
return errors.Errorf("Address %s is duplicated for --store flag.", dup)
}

lookupStores[s] = struct{}{}
if dup := duplicate(*rules); dup != "" {
return errors.Errorf("Address %s is duplicated for --rule flag.", dup)
}

var fileSD *file.Discovery
Expand Down Expand Up @@ -152,7 +154,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
time.Duration(*storeResponseTimeout),
*replicaLabels,
selectorLset,
*stores,
*stores, *rules,
*enableAutodownsampling,
*enablePartialResponse,
fileSD,
Expand Down Expand Up @@ -192,7 +194,7 @@ func runQuery(
storeResponseTimeout time.Duration,
replicaLabels []string,
selectorLset labels.Labels,
storeAddrs []string,
storeAddrs []string, ruleAddrs []string,
enableAutodownsampling bool,
enablePartialResponse bool,
fileSD *file.Discovery,
Expand All @@ -215,30 +217,46 @@ func runQuery(
}

fileSDCache := cache.New()
dnsProvider := dns.NewProvider(
dnsStoreProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_querier_store_apis_", reg),
dns.ResolverType(dnsSDResolver),
)

dnsRuleProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_querier_rule_apis_", reg),
dns.ResolverType(dnsSDResolver),
)

var (
stores = query.NewStoreSet(
logger,
reg,
func() (specs []query.StoreSpec) {
// Add DNS resolved addresses from static flags and file SD.
for _, addr := range dnsProvider.Addresses() {
for _, addr := range dnsStoreProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr))
}

specs = removeDuplicateStoreSpecs(logger, duplicatedStores, specs)

return specs
},
func() (specs []query.RuleSpec) {
for _, addr := range dnsRuleProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr))
}

// NOTE(s-urbaniak): No need to remove duplicates, as rule apis are a subset of store apis.
// hence, any duplicates will be tracked in the store api set.

return specs
},
dialOpts,
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
proxy = store.NewProxyStore(logger, reg, stores.Get, stores.GetRulesClients, component.Query, selectorLset, storeResponseTimeout)
queryableCreator = query.NewQueryableCreator(logger, proxy)
engine = promql.NewEngine(
promql.EngineOpts{
Expand Down Expand Up @@ -289,7 +307,8 @@ func runQuery(
}
fileSDCache.Update(update)
stores.Update(ctxUpdate)
dnsProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...))
dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...))
// Rules apis do not support file service discovery as of now.
case <-ctxUpdate.Done():
return nil
}
Expand All @@ -304,7 +323,8 @@ func runQuery(
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(dnsSDInterval, ctx.Done(), func() error {
dnsProvider.Resolve(ctx, append(fileSDCache.Addresses(), storeAddrs...))
dnsStoreProvider.Resolve(ctx, append(fileSDCache.Addresses(), storeAddrs...))
dnsRuleProvider.Resolve(ctx, ruleAddrs)
return nil
})
}, func(error) {
Expand Down Expand Up @@ -369,7 +389,7 @@ func runQuery(
}

// TODO: Add rules API implementation when ready.
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy, nil,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy, proxy,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down Expand Up @@ -404,3 +424,19 @@ func removeDuplicateStoreSpecs(logger log.Logger, duplicatedStores prometheus.Co
}
return deduplicated
}

// Duplicate returns a duplicate string in the given string slice
// or empty string if none was found.
func duplicate(ss []string) string {
set := map[string]struct{}{}

for _, s := range []string(ss) {
if _, ok := set[s]; ok {
return s
}

set[s] = struct{}{}
}

return ""
}
5 changes: 5 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ Flags:
prefixed with 'dns+' or 'dnssrv+' to detect
store API servers through respective DNS
lookups.
--rule=<rule> ... Addresses of statically configured rules API
servers (repeatable). The scheme may be
prefixed with 'dns+' or 'dnssrv+' to detect
store API servers through respective DNS
lookups.
--store.sd-files=<path> ...
Path to files that contain addresses of store
API servers. The path can be a glob pattern
Expand Down
6 changes: 6 additions & 0 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log.

r.Get("/labels", instr("label_names", api.labelNames))
r.Post("/labels", instr("label_names", api.labelNames))

r.Get("/rules", instr("rules", api.rules))
}

type queryData struct {
Expand Down Expand Up @@ -626,3 +628,7 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) {

return names, warnings, nil
}

func (api *API) rules(r *http.Request) (interface{}, []error, *ApiError) {
panic("implement me")
}
57 changes: 52 additions & 5 deletions pkg/query/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type StoreSpec interface {
Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, storeType component.StoreAPI, err error)
}

type RuleSpec interface {
// Addr returns RulesAPI Address for the rules spec. It is used as its ID.
Addr() string
}

type StoreStatus struct {
Name string
LastCheck time.Time
Expand All @@ -54,7 +59,7 @@ type grpcStoreSpec struct {

// NewGRPCStoreSpec creates store pure gRPC spec.
// It uses Info gRPC call to get Metadata.
func NewGRPCStoreSpec(addr string) StoreSpec {
func NewGRPCStoreSpec(addr string) *grpcStoreSpec {
return &grpcStoreSpec{addr: addr}
}

Expand Down Expand Up @@ -155,6 +160,7 @@ type StoreSet struct {
// Store specifications can change dynamically. If some store is missing from the list, we assuming it is no longer
// accessible and we close gRPC client for it.
storeSpecs func() []StoreSpec
ruleSpecs func() []RuleSpec
dialOpts []grpc.DialOption
gRPCInfoCallTimeout time.Duration

Expand All @@ -176,6 +182,7 @@ func NewStoreSet(
logger log.Logger,
reg *prometheus.Registry,
storeSpecs func() []StoreSpec,
ruleSpecs func() []RuleSpec,
dialOpts []grpc.DialOption,
unhealthyStoreTimeout time.Duration,
) *StoreSet {
Expand All @@ -190,10 +197,14 @@ func NewStoreSet(
if storeSpecs == nil {
storeSpecs = func() []StoreSpec { return nil }
}
if ruleSpecs == nil {
ruleSpecs = func() []RuleSpec { return nil }
}

ss := &StoreSet{
logger: log.With(logger, "component", "storeset"),
storeSpecs: storeSpecs,
ruleSpecs: ruleSpecs,
dialOpts: dialOpts,
storesMetric: storesMetric,
gRPCInfoCallTimeout: 5 * time.Second,
Expand All @@ -207,6 +218,9 @@ func NewStoreSet(
type storeRef struct {
storepb.StoreClient

// if not nil, then this store also supports rules API.
rule storepb.RulesClient

mtx sync.RWMutex
cc *grpc.ClientConn
addr string
Expand Down Expand Up @@ -386,19 +400,24 @@ func (s *StoreSet) Update(ctx context.Context) {

func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*storeRef) map[string]*storeRef {
var (
unique = make(map[string]struct{})
storeAddrSet = make(map[string]struct{})
ruleAddrSet = make(map[string]struct{})
healthyStores = make(map[string]*storeRef, len(stores))
mtx sync.Mutex
wg sync.WaitGroup
)

for _, ruleSpec := range s.ruleSpecs() {
ruleAddrSet[ruleSpec.Addr()] = struct{}{}
}

// Gather healthy stores map concurrently. Build new store if does not exist already.
for _, storeSpec := range s.storeSpecs() {
if _, ok := unique[storeSpec.Addr()]; ok {
if _, ok := storeAddrSet[storeSpec.Addr()]; ok {
level.Warn(s.logger).Log("msg", "duplicated address in store nodes", "address", storeSpec.Addr())
continue
}
unique[storeSpec.Addr()] = struct{}{}
storeAddrSet[storeSpec.Addr()] = struct{}{}

wg.Add(1)
go func(spec StoreSpec) {
Expand All @@ -418,7 +437,14 @@ func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*stor
level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "dialing connection"), "address", addr)
return
}
st = &storeRef{StoreClient: storepb.NewStoreClient(conn), cc: conn, addr: addr, logger: s.logger}

var rule storepb.RulesClient

if _, ok := ruleAddrSet[addr]; ok {
rule = storepb.NewRulesClient(conn)
}

st = &storeRef{StoreClient: storepb.NewStoreClient(conn), rule: rule, cc: conn, addr: addr, logger: s.logger}
}

// Check existing or new store. Is it healthy? What are current metadata?
Expand All @@ -443,6 +469,12 @@ func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*stor
}
wg.Wait()

for ruleAddr := range ruleAddrSet {
if _, ok := storeAddrSet[ruleAddr]; !ok {
level.Warn(s.logger).Log("msg", "ignored rule store", "address", ruleAddr)
}
}

return healthyStores
}

Expand Down Expand Up @@ -498,6 +530,21 @@ func (s *StoreSet) Get() []store.Client {
return stores
}

// GetRulesClients returns a list of all active rules clients.
func (s *StoreSet) GetRulesClients() []storepb.RulesClient {
s.storesMtx.RLock()
defer s.storesMtx.RUnlock()

rules := make([]storepb.RulesClient, 0, len(s.stores))
for _, st := range s.stores {
if st.rule == nil {
continue
}
rules = append(rules, st.rule)
}
return rules
}

func (s *StoreSet) Close() {
s.storesMtx.Lock()
defer s.storesMtx.Unlock()
Expand Down
Loading

0 comments on commit 07a96e2

Please sign in to comment.