diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index e22634a41e8..62e0d2681ec 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -65,7 +65,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { maxConcurrentQueries := cmd.Flag("query.max-concurrent", "Maximum number of queries processed concurrently by query node."). Default("20").Int() - replicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter."). + queryReplicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter."). Strings() instantDefaultMaxSourceResolution := modelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden()) @@ -76,6 +76,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { stores := cmd.Flag("store", "Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups."). PlaceHolder("").Strings() + rules := cmd.Flag("rule", "Addresses of statically configured rules API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups."). + PlaceHolder("").Strings() + fileSDFiles := cmd.Flag("store.sd-files", "Path to files that contain addresses of store API servers. The path can be a glob pattern (repeatable)."). PlaceHolder("").Strings() @@ -107,13 +110,12 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { return errors.Wrap(err, "parse federation labels") } - lookupStores := map[string]struct{}{} - for _, s := range *stores { - if _, ok := lookupStores[s]; ok { - return errors.Errorf("Address %s is duplicated for --store flag.", s) - } + if dup := duplicate(*stores); dup != "" { + return errors.Errorf("Address %s is duplicated for --store flag.", dup) + } - lookupStores[s] = struct{}{} + if dup := duplicate(*rules); dup != "" { + return errors.Errorf("Address %s is duplicated for --rule flag.", dup) } var fileSD *file.Discovery @@ -150,9 +152,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { *maxConcurrentQueries, time.Duration(*queryTimeout), time.Duration(*storeResponseTimeout), - *replicaLabels, + *queryReplicaLabels, selectorLset, - *stores, + *stores, *rules, *enableAutodownsampling, *enablePartialResponse, fileSD, @@ -190,9 +192,9 @@ func runQuery( maxConcurrentQueries int, queryTimeout time.Duration, storeResponseTimeout time.Duration, - replicaLabels []string, + queryReplicaLabels []string, selectorLset labels.Labels, - storeAddrs []string, + storeAddrs []string, ruleAddrs []string, enableAutodownsampling bool, enablePartialResponse bool, fileSD *file.Discovery, @@ -215,19 +217,25 @@ func runQuery( } fileSDCache := cache.New() - dnsProvider := dns.NewProvider( + dnsStoreProvider := dns.NewProvider( logger, extprom.WrapRegistererWithPrefix("thanos_querier_store_apis_", reg), dns.ResolverType(dnsSDResolver), ) + dnsRuleProvider := dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix("thanos_querier_rule_apis_", reg), + dns.ResolverType(dnsSDResolver), + ) + var ( stores = query.NewStoreSet( logger, reg, func() (specs []query.StoreSpec) { // Add DNS resolved addresses from static flags and file SD. - for _, addr := range dnsProvider.Addresses() { + for _, addr := range dnsStoreProvider.Addresses() { specs = append(specs, query.NewGRPCStoreSpec(addr)) } @@ -235,10 +243,20 @@ func runQuery( return specs }, + func() (specs []query.RuleSpec) { + for _, addr := range dnsRuleProvider.Addresses() { + specs = append(specs, query.NewGRPCStoreSpec(addr)) + } + + // NOTE(s-urbaniak): No need to remove duplicates, as rule apis are a subset of store apis. + // hence, any duplicates will be tracked in the store api set. + + return specs + }, dialOpts, unhealthyStoreTimeout, ) - proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout) + proxy = store.NewProxyStore(logger, reg, stores.Get, stores.GetRulesClients, component.Query, selectorLset, storeResponseTimeout) queryableCreator = query.NewQueryableCreator(logger, proxy) engine = promql.NewEngine( promql.EngineOpts{ @@ -289,7 +307,8 @@ func runQuery( } fileSDCache.Update(update) stores.Update(ctxUpdate) - dnsProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...)) + dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...)) + // Rules apis do not support file service discovery as of now. case <-ctxUpdate.Done(): return nil } @@ -304,7 +323,8 @@ func runQuery( ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { return runutil.Repeat(dnsSDInterval, ctx.Done(), func() error { - dnsProvider.Resolve(ctx, append(fileSDCache.Addresses(), storeAddrs...)) + dnsStoreProvider.Resolve(ctx, append(fileSDCache.Addresses(), storeAddrs...)) + dnsRuleProvider.Resolve(ctx, ruleAddrs) return nil }) }, func(error) { @@ -340,7 +360,7 @@ func runQuery( ins := extpromhttp.NewInstrumentationMiddleware(reg) ui.NewQueryUI(logger, reg, stores, flagsMap).Register(router.WithPrefix(webRoutePrefix), ins) - api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse, replicaLabels, instantDefaultMaxSourceResolution) + api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse, queryReplicaLabels, instantDefaultMaxSourceResolution, nil) api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins) @@ -369,7 +389,7 @@ func runQuery( } // TODO: Add rules API implementation when ready. - s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy, nil, + s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy, proxy, grpcserver.WithListen(grpcBindAddr), grpcserver.WithGracePeriod(grpcGracePeriod), grpcserver.WithTLSConfig(tlsCfg), @@ -404,3 +424,19 @@ func removeDuplicateStoreSpecs(logger log.Logger, duplicatedStores prometheus.Co } return deduplicated } + +// Duplicate returns a duplicate string in the given string slice +// or empty string if none was found. +func duplicate(ss []string) string { + set := map[string]struct{}{} + + for _, s := range []string(ss) { + if _, ok := set[s]; ok { + return s + } + + set[s] = struct{}{} + } + + return "" +} diff --git a/docs/components/query.md b/docs/components/query.md index 96f12e3217e..412170e329c 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -327,6 +327,11 @@ Flags: prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups. + --rule= ... Addresses of statically configured rules API + servers (repeatable). The scheme may be + prefixed with 'dns+' or 'dnssrv+' to detect + store API servers through respective DNS + lookups. --store.sd-files= ... Path to files that contain addresses of store API servers. The path can be a glob pattern diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index cc6a20a0a7c..fd1b0333c7f 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -28,6 +28,8 @@ import ( "strconv" "time" + "github.com/prometheus/prometheus/rules" + "github.com/NYTimes/gziphandler" "github.com/go-kit/kit/log" "github.com/opentracing/opentracing-go" @@ -96,12 +98,17 @@ func SetCORS(w http.ResponseWriter) { type ApiFunc func(r *http.Request) (interface{}, []error, *ApiError) +type rulesRetriever interface { + RuleGroups() ([]*rules.Group, error) +} + // API can register a set of endpoints in a router and handle // them using the provided storage and query engine. type API struct { logger log.Logger queryableCreate query.QueryableCreator queryEngine *promql.Engine + rulesRetriever rulesRetriever enableAutodownsampling bool enablePartialResponse bool @@ -122,6 +129,7 @@ func NewAPI( enablePartialResponse bool, replicaLabels []string, defaultInstantQueryMaxSourceResolution time.Duration, + rr rulesRetriever, ) *API { return &API{ logger: logger, @@ -132,6 +140,7 @@ func NewAPI( replicaLabels: replicaLabels, reg: reg, defaultInstantQueryMaxSourceResolution: defaultInstantQueryMaxSourceResolution, + rulesRetriever: rr, now: time.Now, } @@ -168,6 +177,8 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log. r.Get("/labels", instr("label_names", api.labelNames)) r.Post("/labels", instr("label_names", api.labelNames)) + + r.Get("/rules", instr("rules", api.rules)) } type queryData struct { @@ -626,3 +637,7 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) { return names, warnings, nil } + +func (api *API) rules(r *http.Request) (interface{}, []error, *ApiError) { + panic("implement me") +} diff --git a/pkg/query/rules.go b/pkg/query/rules.go new file mode 100644 index 00000000000..d92d77bcfbb --- /dev/null +++ b/pkg/query/rules.go @@ -0,0 +1,23 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package query + +import ( + "github.com/prometheus/prometheus/rules" + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +func NewRulesRetriever(rs storepb.RulesServer) *rulesRetriever { + return &rulesRetriever{ + rulesServer: rs, + } +} + +type rulesRetriever struct { + rulesServer storepb.RulesServer +} + +func (rr *rulesRetriever) RuleGroups() ([]*rules.Group, error) { + return nil, nil +} diff --git a/pkg/query/storeset.go b/pkg/query/storeset.go index 978a3485ed5..b21acc6c307 100644 --- a/pkg/query/storeset.go +++ b/pkg/query/storeset.go @@ -35,7 +35,12 @@ type StoreSpec interface { // If metadata call fails we assume that store is no longer accessible and we should not use it. // NOTE: It is implementation responsibility to retry until context timeout, but a caller responsibility to manage // given store connection. - Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, storeType component.StoreAPI, err error) + Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, Type component.StoreAPI, err error) +} + +type RuleSpec interface { + // Addr returns RulesAPI Address for the rules spec. It is used as its ID. + Addr() string } type StoreStatus struct { @@ -54,7 +59,7 @@ type grpcStoreSpec struct { // NewGRPCStoreSpec creates store pure gRPC spec. // It uses Info gRPC call to get Metadata. -func NewGRPCStoreSpec(addr string) StoreSpec { +func NewGRPCStoreSpec(addr string) *grpcStoreSpec { return &grpcStoreSpec{addr: addr} } @@ -65,7 +70,7 @@ func (s *grpcStoreSpec) Addr() string { // Metadata method for gRPC store API tries to reach host Info method until context timeout. If we are unable to get metadata after // that time, we assume that the host is unhealthy and return error. -func (s *grpcStoreSpec) Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, storeType component.StoreAPI, err error) { +func (s *grpcStoreSpec) Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, Type component.StoreAPI, err error) { resp, err := client.Info(ctx, &storepb.InfoRequest{}, grpc.WaitForReady(true)) if err != nil { return nil, 0, 0, nil, errors.Wrapf(err, "fetching store info from %s", s.addr) @@ -155,6 +160,7 @@ type StoreSet struct { // Store specifications can change dynamically. If some store is missing from the list, we assuming it is no longer // accessible and we close gRPC client for it. storeSpecs func() []StoreSpec + ruleSpecs func() []RuleSpec dialOpts []grpc.DialOption gRPCInfoCallTimeout time.Duration @@ -176,6 +182,7 @@ func NewStoreSet( logger log.Logger, reg *prometheus.Registry, storeSpecs func() []StoreSpec, + ruleSpecs func() []RuleSpec, dialOpts []grpc.DialOption, unhealthyStoreTimeout time.Duration, ) *StoreSet { @@ -190,10 +197,14 @@ func NewStoreSet( if storeSpecs == nil { storeSpecs = func() []StoreSpec { return nil } } + if ruleSpecs == nil { + ruleSpecs = func() []RuleSpec { return nil } + } ss := &StoreSet{ logger: log.With(logger, "component", "storeset"), storeSpecs: storeSpecs, + ruleSpecs: ruleSpecs, dialOpts: dialOpts, storesMetric: storesMetric, gRPCInfoCallTimeout: 5 * time.Second, @@ -207,6 +218,9 @@ func NewStoreSet( type storeRef struct { storepb.StoreClient + // if rule is not nil, then this store also supports rules API. + rule storepb.RulesClient + mtx sync.RWMutex cc *grpc.ClientConn addr string @@ -386,19 +400,24 @@ func (s *StoreSet) Update(ctx context.Context) { func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*storeRef) map[string]*storeRef { var ( - unique = make(map[string]struct{}) + storeAddrSet = make(map[string]struct{}) + ruleAddrSet = make(map[string]struct{}) healthyStores = make(map[string]*storeRef, len(stores)) mtx sync.Mutex wg sync.WaitGroup ) + for _, ruleSpec := range s.ruleSpecs() { + ruleAddrSet[ruleSpec.Addr()] = struct{}{} + } + // Gather healthy stores map concurrently. Build new store if does not exist already. for _, storeSpec := range s.storeSpecs() { - if _, ok := unique[storeSpec.Addr()]; ok { + if _, ok := storeAddrSet[storeSpec.Addr()]; ok { level.Warn(s.logger).Log("msg", "duplicated address in store nodes", "address", storeSpec.Addr()) continue } - unique[storeSpec.Addr()] = struct{}{} + storeAddrSet[storeSpec.Addr()] = struct{}{} wg.Add(1) go func(spec StoreSpec) { @@ -418,11 +437,18 @@ func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*stor level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "dialing connection"), "address", addr) return } - st = &storeRef{StoreClient: storepb.NewStoreClient(conn), cc: conn, addr: addr, logger: s.logger} + + var rule storepb.RulesClient + + if _, ok := ruleAddrSet[addr]; ok { + rule = storepb.NewRulesClient(conn) + } + + st = &storeRef{StoreClient: storepb.NewStoreClient(conn), rule: rule, cc: conn, addr: addr, logger: s.logger} } // Check existing or new store. Is it healthy? What are current metadata? - labelSets, minTime, maxTime, storeType, err := spec.Metadata(ctx, st.StoreClient) + labelSets, minTime, maxTime, Type, err := spec.Metadata(ctx, st.StoreClient) if err != nil { if !seenAlready { // Close only if new. Unhealthy `s.stores` will be closed later on. @@ -433,7 +459,7 @@ func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*stor return } s.updateStoreStatus(st, nil) - st.Update(labelSets, minTime, maxTime, storeType) + st.Update(labelSets, minTime, maxTime, Type) mtx.Lock() defer mtx.Unlock() @@ -443,6 +469,12 @@ func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*stor } wg.Wait() + for ruleAddr := range ruleAddrSet { + if _, ok := storeAddrSet[ruleAddr]; !ok { + level.Warn(s.logger).Log("msg", "ignored rule store", "address", ruleAddr) + } + } + return healthyStores } @@ -498,6 +530,21 @@ func (s *StoreSet) Get() []store.Client { return stores } +// GetRulesClients returns a list of all active rules clients. +func (s *StoreSet) GetRulesClients() []storepb.RulesClient { + s.storesMtx.RLock() + defer s.storesMtx.RUnlock() + + rules := make([]storepb.RulesClient, 0, len(s.stores)) + for _, st := range s.stores { + if st.rule == nil { + continue + } + rules = append(rules, st.rule) + } + return rules +} + func (s *StoreSet) Close() { s.storesMtx.Lock() defer s.storesMtx.Unlock() diff --git a/pkg/query/storeset_test.go b/pkg/query/storeset_test.go index b3dd853dd52..8ec7eb6b455 100644 --- a/pkg/query/storeset_test.go +++ b/pkg/query/storeset_test.go @@ -179,12 +179,17 @@ func TestStoreSet_Update(t *testing.T) { // Testing if duplicates can cause weird results. discoveredStoreAddr = append(discoveredStoreAddr, discoveredStoreAddr[0]) - storeSet := NewStoreSet(nil, nil, func() (specs []StoreSpec) { - for _, addr := range discoveredStoreAddr { - specs = append(specs, NewGRPCStoreSpec(addr)) - } - return specs - }, testGRPCOpts, time.Minute) + storeSet := NewStoreSet(nil, nil, + func() (specs []StoreSpec) { + for _, addr := range discoveredStoreAddr { + specs = append(specs, NewGRPCStoreSpec(addr)) + } + return specs + }, + func() (specs []RuleSpec) { + return nil + }, + testGRPCOpts, time.Minute) storeSet.gRPCInfoCallTimeout = 2 * time.Second defer storeSet.Close() @@ -521,12 +526,15 @@ func TestStoreSet_Update_NoneAvailable(t *testing.T) { st.CloseOne(initialStoreAddr[0]) st.CloseOne(initialStoreAddr[1]) - storeSet := NewStoreSet(nil, nil, func() (specs []StoreSpec) { - for _, addr := range initialStoreAddr { - specs = append(specs, NewGRPCStoreSpec(addr)) - } - return specs - }, testGRPCOpts, time.Minute) + storeSet := NewStoreSet(nil, nil, + func() (specs []StoreSpec) { + for _, addr := range initialStoreAddr { + specs = append(specs, NewGRPCStoreSpec(addr)) + } + return specs + }, + func() (specs []RuleSpec) { return nil }, + testGRPCOpts, time.Minute) storeSet.gRPCInfoCallTimeout = 2 * time.Second // Should not matter how many of these we run. @@ -539,3 +547,119 @@ func TestStoreSet_Update_NoneAvailable(t *testing.T) { expected := newStoreAPIStats() testutil.Equals(t, expected, storeSet.storesMetric.storeNodes) } + +func TestStoreSet_Update_Rules(t *testing.T) { + stores, err := startTestStores([]testStoreMeta{ + { + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{} + }, + storeType: component.Sidecar, + }, + { + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{} + }, + storeType: component.Rule, + }, + }) + testutil.Ok(t, err) + defer stores.Close() + + for _, tc := range []struct { + name string + storeSpecs func() []StoreSpec + ruleSpecs func() []RuleSpec + expectedStores int + expectedRules int + }{ + { + name: "stores, no rules", + storeSpecs: func() []StoreSpec { + return []StoreSpec{ + NewGRPCStoreSpec(stores.orderAddrs[0]), + NewGRPCStoreSpec(stores.orderAddrs[1]), + } + }, + expectedStores: 2, + expectedRules: 0, + }, + { + name: "rules, no stores", + ruleSpecs: func() []RuleSpec { + return []RuleSpec{ + NewGRPCStoreSpec(stores.orderAddrs[0]), + } + }, + expectedStores: 0, + expectedRules: 0, + }, + { + name: "one store, different rule", + storeSpecs: func() []StoreSpec { + return []StoreSpec{ + NewGRPCStoreSpec(stores.orderAddrs[0]), + } + }, + ruleSpecs: func() []RuleSpec { + return []RuleSpec{ + NewGRPCStoreSpec(stores.orderAddrs[1]), + } + }, + expectedStores: 1, + expectedRules: 0, + }, + { + name: "two stores, one rule", + storeSpecs: func() []StoreSpec { + return []StoreSpec{ + NewGRPCStoreSpec(stores.orderAddrs[0]), + NewGRPCStoreSpec(stores.orderAddrs[1]), + } + }, + ruleSpecs: func() []RuleSpec { + return []RuleSpec{ + NewGRPCStoreSpec(stores.orderAddrs[0]), + } + }, + expectedStores: 2, + expectedRules: 1, + }, + { + name: "two stores, two rules", + storeSpecs: func() []StoreSpec { + return []StoreSpec{ + NewGRPCStoreSpec(stores.orderAddrs[0]), + NewGRPCStoreSpec(stores.orderAddrs[1]), + } + }, + ruleSpecs: func() []RuleSpec { + return []RuleSpec{ + NewGRPCStoreSpec(stores.orderAddrs[0]), + NewGRPCStoreSpec(stores.orderAddrs[1]), + } + }, + expectedStores: 2, + expectedRules: 2, + }, + } { + storeSet := NewStoreSet(nil, nil, + tc.storeSpecs, + tc.ruleSpecs, + testGRPCOpts, time.Minute) + + t.Run(tc.name, func(t *testing.T) { + storeSet.Update(context.Background()) + testutil.Equals(t, tc.expectedStores, len(storeSet.stores)) + + gotRules := 0 + for _, ref := range storeSet.stores { + if ref.rule != nil { + gotRules += 1 + } + } + + testutil.Equals(t, tc.expectedRules, gotRules) + }) + } +} diff --git a/pkg/server/grpc/grpc.go b/pkg/server/grpc/grpc.go index b7111583115..bc7fd6c176a 100644 --- a/pkg/server/grpc/grpc.go +++ b/pkg/server/grpc/grpc.go @@ -39,7 +39,8 @@ type Server struct { opts options } -// New creates a new gRPC Store API or Store API + Rules API server based on what storeSrv and rulesSrv. +// New creates a new gRPC Store API. +// If rulesSrv is not nil, it also registers Rules API to the returned server. func New(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, comp component.Component, probe *prober.GRPCProbe, storeSrv storepb.StoreServer, rulesSrv storepb.RulesServer, opts ...Option) *Server { logger = log.With(logger, "service", "gRPC/server", "component", comp.String()) options := options{} diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index a8a77c3804b..2ed17e036de 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -50,6 +50,7 @@ type Client interface { type ProxyStore struct { logger log.Logger stores func() []Client + rules func() []storepb.RulesClient component component.StoreAPI selectorLabels labels.Labels @@ -57,6 +58,120 @@ type ProxyStore struct { metrics *proxyStoreMetrics } +func (s *ProxyStore) Rules(req *storepb.RulesRequest, srv storepb.Rules_RulesServer) error { + var ( + g, gctx = errgroup.WithContext(srv.Context()) + respChan = make(chan *storepb.RuleGroup, 10) + groups []*storepb.RuleGroup + ) + + defer func() { close(respChan) }() + + for _, rulesClient := range s.rules() { + rs := &rulesStream{ + client: rulesClient, + request: req, + channel: respChan, + server: srv, + } + g.Go(func() error { return rs.receive(gctx) }) + } + + for resp := range respChan { + groups = append(groups, resp) + } + + if err := g.Wait(); err != nil { + level.Error(s.logger).Log("err", err) + return err + } + + groups = dedupGroups(groups) + + // send groups via server + for _, g := range groups { + if err := srv.Send(storepb.NewRuleGroupRulesResponse(g)); err != nil { + return status.Error(codes.Unknown, errors.Wrap(err, "send rules response").Error()) + } + } + + return nil +} + +func dedupGroups(groups []*storepb.RuleGroup) []*storepb.RuleGroup { + // sort groups such that they appear next to each other + sort.Slice(groups, func(i, j int) bool { return groups[i].Name < groups[j].Name }) + + // nothing to do if we have no or a single result, also no deduplication is necessary + if len(groups) < 2 { + return groups + } + + i := 0 + for _, g := range groups[1:] { + if g.Name == groups[i].Name { + groups[i].Rules = append(groups[i].Rules, g.Rules...) + } else { + i++ + groups[i] = g + } + } + + return groups[:i+1] +} + +type rulesStream struct { + client storepb.RulesClient + request *storepb.RulesRequest + channel chan *storepb.RuleGroup + server storepb.Rules_RulesServer +} + +func (stream *rulesStream) receive(ctx context.Context) error { + rules, err := stream.client.Rules(ctx, stream.request) + if err != nil { + err = errors.Wrapf(err, "fetching rules from rules client %v", stream.client) + + if stream.request.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT { + return err + } + + if err := stream.server.Send(storepb.NewWarningRulesResponse(err)); err != nil { + return err + } + } + + for { + rule, err := rules.Recv() + if err == io.EOF { + return nil + } + + if err != nil { + err = errors.Wrapf(err, "receiving rules from rules client %v", stream.client) + + if stream.request.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT { + return err + } + + if err := stream.server.Send(storepb.NewWarningRulesResponse(err)); err != nil { + return errors.Wrapf(err, "sending rules error to server %v", stream.server) + } + + continue + } + + if w := rule.GetWarning(); w != "" { + if err := stream.server.Send(storepb.NewWarningRulesResponse(errors.New(w))); err != nil { + return errors.Wrapf(err, "sending rules warning to server %v", stream.server) + } + continue + } + + stream.channel <- rule.GetGroup() + } +} + type proxyStoreMetrics struct { emptyStreamResponses prometheus.Counter } @@ -83,6 +198,7 @@ func NewProxyStore( logger log.Logger, reg prometheus.Registerer, stores func() []Client, + rules func() []storepb.RulesClient, component component.StoreAPI, selectorLabels labels.Labels, responseTimeout time.Duration, @@ -95,6 +211,7 @@ func NewProxyStore( s := &ProxyStore{ logger: logger, stores: stores, + rules: rules, component: component, selectorLabels: selectorLabels, responseTimeout: responseTimeout, diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 5f6c32e387d..a76f2089e14 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -9,6 +9,7 @@ import ( "io" "math" "os" + "reflect" "sort" "testing" "time" @@ -60,6 +61,7 @@ func TestProxyStore_Info(t *testing.T) { q := NewProxyStore(nil, nil, func() []Client { return nil }, + nil, component.Query, nil, 0*time.Second, ) @@ -420,6 +422,7 @@ func TestProxyStore_Series(t *testing.T) { q := NewProxyStore(nil, nil, func() []Client { return tc.storeAPIs }, + nil, component.Query, tc.selectorLabels, 0*time.Second, @@ -886,6 +889,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { q := NewProxyStore(nil, nil, func() []Client { return tc.storeAPIs }, + nil, component.Query, tc.selectorLabels, 4*time.Second, @@ -935,6 +939,7 @@ func TestProxyStore_Series_RequestParamsProxied(t *testing.T) { q := NewProxyStore(nil, nil, func() []Client { return cls }, + nil, component.Query, nil, 0*time.Second, @@ -995,6 +1000,7 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) { q := NewProxyStore(nil, nil, func() []Client { return cls }, + nil, component.Query, labels.FromStrings("fed", "a"), 0*time.Second, @@ -1034,6 +1040,7 @@ func TestProxyStore_LabelValues(t *testing.T) { q := NewProxyStore(nil, nil, func() []Client { return cls }, + nil, component.Query, nil, 0*time.Second, @@ -1138,6 +1145,7 @@ func TestProxyStore_LabelNames(t *testing.T) { nil, nil, func() []Client { return tc.storeAPIs }, + nil, component.Query, nil, 0*time.Second, @@ -1479,3 +1487,149 @@ func TestMergeLabels(t *testing.T) { testutil.Equals(t, expected, resLabels) } + +func TestDedupGroups(t *testing.T) { + for _, tc := range []struct { + name string + groups, want []*storepb.RuleGroup + }{ + { + name: "no groups", + groups: nil, + want: nil, + }, + { + name: "empty group", + groups: []*storepb.RuleGroup{ + {Name: "a"}, + }, + want: []*storepb.RuleGroup{ + {Name: "a"}, + }, + }, + { + name: "multiple empty groups", + groups: []*storepb.RuleGroup{ + {Name: "a"}, + {Name: "b"}, + }, + want: []*storepb.RuleGroup{ + {Name: "a"}, + {Name: "b"}, + }, + }, + { + name: "single group", + groups: []*storepb.RuleGroup{ + { + Name: "a", + Rules: []*storepb.Rule{ + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "a1"}), + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "a2"}), + }, + }, + }, + want: []*storepb.RuleGroup{ + { + Name: "a", + Rules: []*storepb.Rule{ + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "a1"}), + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "a2"}), + }, + }, + }, + }, + { + name: "separate groups", + groups: []*storepb.RuleGroup{ + { + Name: "a", + Rules: []*storepb.Rule{ + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "a1"}), + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "a2"}), + }, + }, + { + Name: "b", + Rules: []*storepb.Rule{ + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "b1"}), + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "b2"}), + }, + }, + }, + want: []*storepb.RuleGroup{ + { + Name: "a", + Rules: []*storepb.Rule{ + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "a1"}), + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "a2"}), + }, + }, + { + Name: "b", + Rules: []*storepb.Rule{ + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "b1"}), + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "b2"}), + }, + }, + }, + }, + { + name: "duplicate groups", + groups: []*storepb.RuleGroup{ + { + Name: "a", + Rules: []*storepb.Rule{ + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "a1"}), + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "a2"}), + }, + }, + { + Name: "b", + Rules: []*storepb.Rule{ + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "b1"}), + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "b2"}), + }, + }, + { + Name: "c", + }, + { + Name: "a", + Rules: []*storepb.Rule{ + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "a1"}), + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "a2"}), + }, + }, + }, + want: []*storepb.RuleGroup{ + { + Name: "a", + Rules: []*storepb.Rule{ + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "a1"}), + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "a2"}), + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "a1"}), + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "a2"}), + }, + }, + { + Name: "b", + Rules: []*storepb.Rule{ + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "b1"}), + storepb.NewRecordingRule(&storepb.RecordingRule{Name: "b2"}), + }, + }, + { + Name: "c", + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + got := dedupGroups(tc.groups) + if !reflect.DeepEqual(tc.want, got) { + t.Errorf("want groups %v, got %v", tc.want, got) + } + }) + } +} diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index eb83c5246cd..9bd0f4ab03d 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -33,6 +33,28 @@ func NewSeriesResponse(series *Series) *SeriesResponse { } } +func NewRuleGroupRulesResponse(rg *RuleGroup) *RulesResponse { + return &RulesResponse{ + Result: &RulesResponse_Group{ + Group: rg, + }, + } +} + +func NewWarningRulesResponse(warning error) *RulesResponse { + return &RulesResponse{ + Result: &RulesResponse_Warning{ + Warning: warning.Error(), + }, + } +} + +func NewRecordingRule(r *RecordingRule) *Rule { + return &Rule{ + Result: &Rule_Recording{Recording: r}, + } +} + // CompareLabels compares two sets of labels. func CompareLabels(a, b []Label) int { l := len(a) diff --git a/pkg/store/storepb/rpc.pb.go b/pkg/store/storepb/rpc.pb.go index 464644c98d0..0a99b102937 100644 --- a/pkg/store/storepb/rpc.pb.go +++ b/pkg/store/storepb/rpc.pb.go @@ -1139,6 +1139,7 @@ var _WriteableStore_serviceDesc = grpc.ServiceDesc{ // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type RulesClient interface { /// Rules has info for all rules. + /// Returned rules are expected to include external labels. Rules(ctx context.Context, in *RulesRequest, opts ...grpc.CallOption) (Rules_RulesClient, error) } @@ -1185,6 +1186,7 @@ func (x *rulesRulesClient) Recv() (*RulesResponse, error) { // RulesServer is the server API for Rules service. type RulesServer interface { /// Rules has info for all rules. + /// Returned rules are expected to include external labels. Rules(*RulesRequest, Rules_RulesServer) error } diff --git a/pkg/store/storepb/rpc.proto b/pkg/store/storepb/rpc.proto index 3298a4f33e2..30ec66106dc 100644 --- a/pkg/store/storepb/rpc.proto +++ b/pkg/store/storepb/rpc.proto @@ -150,6 +150,7 @@ message LabelValuesResponse { /// Rules represents API that is responsible for gathering rules and their statuses. service Rules { /// Rules has info for all rules. + /// Returned rules are expected to include external labels. rpc Rules(RulesRequest) returns (stream RulesResponse); }