From e8bc4dee444d413c8664d622b340709842b8fc9c Mon Sep 17 00:00:00 2001 From: Sergiusz Urbaniak Date: Mon, 27 Apr 2020 14:58:12 +0200 Subject: [PATCH] pkg/query/api/v1: initial implementation Signed-off-by: Sergiusz Urbaniak --- cmd/thanos/query.go | 2 +- pkg/query/api/v1.go | 54 ++++++++++++++++++++++++++++++++++++++++++--- pkg/query/rules.go | 49 +++++++++++++++++++++++++++++++++++----- 3 files changed, 96 insertions(+), 9 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 89bc9c9813b..e8a15073c78 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -361,7 +361,7 @@ func runQuery( ins := extpromhttp.NewInstrumentationMiddleware(reg) ui.NewQueryUI(logger, reg, stores, flagsMap).Register(router.WithPrefix(webRoutePrefix), ins) - api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse, queryReplicaLabels, instantDefaultMaxSourceResolution, nil) + api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse, queryReplicaLabels, instantDefaultMaxSourceResolution, query.NewRulesRetriever(proxy)) api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins) diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index fd73547de8e..883969b2ae5 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -26,6 +26,7 @@ import ( "math" "net/http" "strconv" + "strings" "time" "github.com/NYTimes/gziphandler" @@ -38,11 +39,11 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/storage" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/query" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -98,7 +99,7 @@ func SetCORS(w http.ResponseWriter) { type ApiFunc func(r *http.Request) (interface{}, []error, *ApiError) type rulesRetriever interface { - RuleGroups() ([]*rules.Group, error) + RuleGroups(context.Context) ([]*storepb.RuleGroup, storage.Warnings, error) } // API can register a set of endpoints in a router and handle @@ -638,5 +639,52 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) { } func (api *API) rules(r *http.Request) (interface{}, []error, *ApiError) { - panic("implement me") + var ( + res = &storepb.RuleGroups{} + typeParam = strings.ToLower(r.URL.Query().Get("type")) + ) + + if typeParam != "" && typeParam != "alert" && typeParam != "record" { + return nil, nil, &ApiError{errorBadData, errors.Errorf("invalid query parameter type='%v'", typeParam)} + } + + returnAlerts := typeParam == "" || typeParam == "alert" + returnRecording := typeParam == "" || typeParam == "record" + + groups, warnings, err := api.rulesRetriever.RuleGroups(r.Context()) + if err != nil { + return nil, nil, &ApiError{ErrorInternal, fmt.Errorf("error retrieving rules: %v", err)} + } + + for _, grp := range groups { + apiRuleGroup := &storepb.RuleGroup{ + Name: grp.Name, + File: grp.File, + Interval: grp.Interval, + EvaluationDurationSeconds: grp.EvaluationDurationSeconds, + LastEvaluation: grp.LastEvaluation, + DeprecatedPartialResponseStrategy: grp.DeprecatedPartialResponseStrategy, + PartialResponseStrategy: grp.PartialResponseStrategy, + } + + for _, r := range grp.Rules { + switch { + case r.GetAlert() != nil: + if !returnAlerts { + break + } + apiRuleGroup.Rules = append(apiRuleGroup.Rules, r) + case r.GetRecording() != nil: + if !returnRecording { + break + } + apiRuleGroup.Rules = append(apiRuleGroup.Rules, r) + default: + return nil, nil, &ApiError{ErrorInternal, fmt.Errorf("rule %v: unsupported", r)} + } + } + res.Groups = append(res.Groups, apiRuleGroup) + } + + return res, warnings, nil } diff --git a/pkg/query/rules.go b/pkg/query/rules.go index d92d77bcfbb..555654aa59d 100644 --- a/pkg/query/rules.go +++ b/pkg/query/rules.go @@ -4,20 +4,59 @@ package query import ( - "github.com/prometheus/prometheus/rules" + "context" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/store/storepb" ) func NewRulesRetriever(rs storepb.RulesServer) *rulesRetriever { return &rulesRetriever{ - rulesServer: rs, + proxy: rs, } } type rulesRetriever struct { - rulesServer storepb.RulesServer + proxy storepb.RulesServer +} + +func (rr *rulesRetriever) RuleGroups(ctx context.Context) ([]*storepb.RuleGroup, storage.Warnings, error) { + resp := &rulesServer{ctx: ctx} + + if err := rr.proxy.Rules(&storepb.RulesRequest{ + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, resp); err != nil { + return nil, nil, errors.Wrap(err, "proxy RuleGroups()") + } + + return resp.groups, resp.warnings, nil +} + +type rulesServer struct { + // This field just exist to pseudo-implement the unused methods of the interface. + storepb.Rules_RulesServer + ctx context.Context + + warnings []error + groups []*storepb.RuleGroup +} + +func (srv *rulesServer) Send(res *storepb.RulesResponse) error { + if res.GetWarning() != "" { + srv.warnings = append(srv.warnings, errors.New(res.GetWarning())) + return nil + } + + if res.GetGroup() == nil { + return errors.New("no group") + } + + srv.groups = append(srv.groups, res.GetGroup()) + return nil + } -func (rr *rulesRetriever) RuleGroups() ([]*rules.Group, error) { - return nil, nil +func (srv *rulesServer) Context() context.Context { + return srv.ctx }