Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

querier: Support store matchers and time range filter on labels API #3133

Merged
merged 4 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

## Unreleased

### Added

- [#3133](https://github.com/thanos-io/thanos/pull/3133) Query: Allow passing a `storeMatch[]` to Labels APIs. Also supports time range metadata based store filtering.

## [v0.15.0](https://github.com/thanos-io/thanos/releases) - 2020.09.07

Highlights:
Expand Down
21 changes: 14 additions & 7 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ import (
"github.com/thanos-io/thanos/pkg/tracing"
)

const (
dedupParam = "dedup"
partialResponseParam = "partial_response"
maxSourceResolutionParam = "max_source_resolution"
replicaLabelsParam = "replicaLabels[]"
storeMatcherParam = "storeMatch[]"
)

// QueryAPI is an API used by Thanos Query.
type QueryAPI struct {
baseAPI *api.BaseAPI
Expand Down Expand Up @@ -139,7 +147,6 @@ type queryData struct {
}

func (qapi *QueryAPI) parseEnableDedupParam(r *http.Request) (enableDeduplication bool, _ *api.ApiError) {
const dedupParam = "dedup"
enableDeduplication = true

if val := r.FormValue(dedupParam); val != "" {
Expand All @@ -153,7 +160,6 @@ func (qapi *QueryAPI) parseEnableDedupParam(r *http.Request) (enableDeduplicatio
}

func (qapi *QueryAPI) parseReplicaLabelsParam(r *http.Request) (replicaLabels []string, _ *api.ApiError) {
const replicaLabelsParam = "replicaLabels[]"
if err := r.ParseForm(); err != nil {
return nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")}
}
Expand All @@ -168,7 +174,6 @@ func (qapi *QueryAPI) parseReplicaLabelsParam(r *http.Request) (replicaLabels []
}

func (qapi *QueryAPI) parseStoreMatchersParam(r *http.Request) (storeMatchers [][]storepb.LabelMatcher, _ *api.ApiError) {
const storeMatcherParam = "storeMatch[]"
if err := r.ParseForm(); err != nil {
return nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")}
}
Expand All @@ -189,7 +194,6 @@ func (qapi *QueryAPI) parseStoreMatchersParam(r *http.Request) (storeMatchers []
}

func (qapi *QueryAPI) parseDownsamplingParamMillis(r *http.Request, defaultVal time.Duration) (maxResolutionMillis int64, _ *api.ApiError) {
const maxSourceResolutionParam = "max_source_resolution"
maxSourceResolution := 0 * time.Second

val := r.FormValue(maxSourceResolutionParam)
Expand All @@ -212,8 +216,6 @@ func (qapi *QueryAPI) parseDownsamplingParamMillis(r *http.Request, defaultVal t
}

func (qapi *QueryAPI) parsePartialResponseParam(r *http.Request, defaultEnablePartialResponse bool) (enablePartialResponse bool, _ *api.ApiError) {
const partialResponseParam = "partial_response"

// Overwrite the cli flag when provided as a query parameter.
if val := r.FormValue(partialResponseParam); val != "" {
var err error
Expand Down Expand Up @@ -439,7 +441,12 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, nil, 0, enablePartialResponse, false).
storeMatchers, apiErr := qapi.parseStoreMatchersParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, storeMatchers, 0, enablePartialResponse, false).
Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
Expand Down
47 changes: 46 additions & 1 deletion pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ func TestParseDownsamplingParamMillis(t *testing.T) {
gate: gate.NewKeeper(nil).NewGate(4),
}
v := url.Values{}
v.Set("max_source_resolution", test.maxSourceResolutionParam)
v.Set(maxSourceResolutionParam, test.maxSourceResolutionParam)
r := http.Request{PostForm: v}

// If no max_source_resolution is specified fit at least 5 samples between steps.
Expand All @@ -1070,6 +1070,51 @@ func TestParseDownsamplingParamMillis(t *testing.T) {
}
}

func TestParseStoreMatchersParam(t *testing.T) {
for i, tc := range []struct {
storeMatchers string
fail bool
result [][]storepb.LabelMatcher
}{
{
storeMatchers: "123",
fail: true,
},
{
storeMatchers: "foo",
fail: false,
result: [][]storepb.LabelMatcher{{storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "foo"}}},
},
{
storeMatchers: `{__address__="localhost:10905"}`,
fail: false,
result: [][]storepb.LabelMatcher{{storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "localhost:10905"}}},
},
{
storeMatchers: `{__address__="localhost:10905", cluster="test"}`,
fail: false,
result: [][]storepb.LabelMatcher{{
storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "localhost:10905"},
storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "cluster", Value: "test"},
}},
},
} {
api := QueryAPI{
gate: gate.NewKeeper(nil).NewGate(4),
}
v := url.Values{}
v.Set(storeMatcherParam, tc.storeMatchers)
r := &http.Request{PostForm: v}

storeMatchers, err := api.parseStoreMatchersParam(r)
if !tc.fail {
testutil.Assert(t, reflect.DeepEqual(storeMatchers, tc.result), "case %v: expected %v to be equal to %v", i, storeMatchers, tc.result)
} else {
testutil.NotOk(t, err)
}
}
}

type mockedRulesClient struct {
g map[rulespb.RulesRequest_Type][]*rulespb.RuleGroup
w storage.Warnings
Expand Down
8 changes: 7 additions & 1 deletion pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .

aggrs := aggrsFromFunc(hints.Func)

// TODO: Pass it using the SerieRequest instead of relying on context
// TODO: Pass it using the SeriesRequest instead of relying on context
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers)

resp := &seriesServer{ctx: ctx}
Expand Down Expand Up @@ -333,6 +333,9 @@ func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_values")
defer span.Finish()

// TODO: Pass it using the SeriesRequest instead of relying on context
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers)

resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: name,
PartialResponseDisabled: !q.partialResponse,
Expand All @@ -356,6 +359,9 @@ func (q *querier) LabelNames() ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_names")
defer span.Finish()

// TODO: Pass it using the SeriesRequest instead of relying on context
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers)

resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
PartialResponseDisabled: !q.partialResponse,
Start: q.mint,
Expand Down
61 changes: 50 additions & 11 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,10 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
ok, _ = storeMatches(st, r.MinTime, r.MaxTime, storeMatcher, r.Matchers...)
})
if !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out", st))
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out", st))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very minor thing but why was this word capitalized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I thought capitalized looks better.

Capitialized

level=warn ts=2020-09-08T20:59:57.731820598Z caller=proxy.go:310 err="No StoreAPIs matched for this query" stores="Store Addr: localhost:10905 LabelSets: [name:\"cluster\" value:\"test\" ] Mint: 1598450400000 Maxt: 1599408000000 filtered out;Store Addr: localhost:10903 LabelSets: [name:\"cluster\" value:\"test\" ] Mint: 1598299200000 Maxt: 9223372036854775807 filtered out"

Not Capitalized

level=warn ts=2020-09-08T21:02:28.654728738Z caller=proxy.go:310 err="No StoreAPIs matched for this query" stores="store Addr: localhost:10903 LabelSets: [name:\"cluster\" value:\"test\" ] Mint: 1598299200000 Maxt: 9223372036854775807 filtered out;store Addr: localhost:10905 LabelSets: [name:\"cluster\" value:\"test\" ] Mint: 1598450400000 Maxt: 1599537600000 filtered out"

stores="store Addr: localhost:10903

It looks weird to have store not capitalized but have Addr capitalized.

continue
}
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st))
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))

// This is used to cancel this stream when one operations takes too long.
seriesCtx, closeSeries := context.WithCancel(gctx)
Expand Down Expand Up @@ -515,7 +515,8 @@ func storeMatches(s Client, mint, maxt int64, storeMatcher [][]storepb.LabelMatc
return false, nil
}
match, err := storeMatchMetadata(s, storeMatcher)
if err != nil || !match {
// Return result here if no matchers set.
if len(matchers) == 0 || err != nil || !match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't labelSetsMatch() already return true, nil if matchers is nil? In labelSetMatches we range over nil and because it's empty, we simply jump to the end and return true, nil, right?

Copy link
Contributor Author

@yeya24 yeya24 Sep 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return labelSetsMatch(s.LabelSets(), matchers) If we don't return and then go to labelSetsMatch here, we need to first evaluate s.LabelSets() https://github.com/thanos-io/thanos/blob/master/pkg/query/storeset.go#L277.

I want to avoid this function because it needs to get the lock and alloc a slice.

return match, err
}
return labelSetsMatch(s.LabelSets(), matchers)
Expand Down Expand Up @@ -587,14 +588,32 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques
*storepb.LabelNamesResponse, error,
) {
var (
warnings []string
names [][]string
mtx sync.Mutex
g, gctx = errgroup.WithContext(ctx)
warnings []string
names [][]string
mtx sync.Mutex
g, gctx = errgroup.WithContext(ctx)
storeDebugMsgs []string
)

for _, st := range s.stores() {
st := st
var ok bool
tracing.DoInSpan(gctx, "store_matches", func(ctx context.Context) {
storeMatcher := [][]storepb.LabelMatcher{}
if ctxVal := ctx.Value(StoreMatcherKey); ctxVal != nil {
if value, ok := ctxVal.([][]storepb.LabelMatcher); ok {
storeMatcher = value
}
}
// We can skip error, we already translated matchers once.
ok, _ = storeMatches(st, r.Start, r.End, storeMatcher)
})
if !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out", st))
continue
}
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))

g.Go(func() error {
resp, err := st.LabelNames(gctx, &storepb.LabelNamesRequest{
PartialResponseDisabled: r.PartialResponseDisabled,
Expand Down Expand Up @@ -626,6 +645,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques
return nil, err
}

level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))
return &storepb.LabelNamesResponse{
Names: strutil.MergeUnsortedSlices(names...),
Warnings: warnings,
Expand All @@ -637,14 +657,32 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ
*storepb.LabelValuesResponse, error,
) {
var (
warnings []string
all [][]string
mtx sync.Mutex
g, gctx = errgroup.WithContext(ctx)
warnings []string
all [][]string
mtx sync.Mutex
g, gctx = errgroup.WithContext(ctx)
storeDebugMsgs []string
)

for _, st := range s.stores() {
store := st
var ok bool
tracing.DoInSpan(gctx, "store_matches", func(ctx context.Context) {
storeMatcher := [][]storepb.LabelMatcher{}
if ctxVal := ctx.Value(StoreMatcherKey); ctxVal != nil {
if value, ok := ctxVal.([][]storepb.LabelMatcher); ok {
storeMatcher = value
}
}
// We can skip error, we already translated matchers once.
ok, _ = storeMatches(st, r.Start, r.End, storeMatcher)
})
if !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out", st))
continue
}
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))

g.Go(func() error {
resp, err := store.LabelValues(gctx, &storepb.LabelValuesRequest{
Label: r.Label,
Expand Down Expand Up @@ -677,6 +715,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ
return nil, err
}

level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))
return &storepb.LabelValuesResponse{
Values: strutil.MergeUnsortedSlices(all...),
Warnings: warnings,
Expand Down
77 changes: 76 additions & 1 deletion pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,13 @@ func TestProxyStore_LabelValues(t *testing.T) {
Values: []string{"3", "4"},
},
}},
&testClient{StoreClient: &mockedStoreAPI{
RespLabelValues: &storepb.LabelValuesResponse{
Values: []string{"5", "6"},
}},
minTime: timestamp.FromTime(time.Now().Add(-1 * time.Minute)),
maxTime: timestamp.FromTime(time.Now()),
},
}
q := NewProxyStore(nil,
nil,
Expand All @@ -1107,6 +1114,20 @@ func TestProxyStore_LabelValues(t *testing.T) {
testutil.Ok(t, err)
testutil.Assert(t, proto.Equal(req, m1.LastLabelValuesReq), "request was not proxied properly to underlying storeAPI: %s vs %s", req, m1.LastLabelValuesReq)

testutil.Equals(t, []string{"1", "2", "3", "4", "5", "6"}, resp.Values)
testutil.Equals(t, 1, len(resp.Warnings))

// Request outside the time range of the last store client.
req = &storepb.LabelValuesRequest{
Label: "a",
PartialResponseDisabled: true,
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(time.Now().Add(-1 * time.Hour)),
}
resp, err = q.LabelValues(ctx, req)
testutil.Ok(t, err)
testutil.Assert(t, proto.Equal(req, m1.LastLabelValuesReq), "request was not proxied properly to underlying storeAPI: %s vs %s", req, m1.LastLabelValuesReq)

testutil.Equals(t, []string{"1", "2", "3", "4"}, resp.Values)
testutil.Equals(t, 1, len(resp.Warnings))
}
Expand All @@ -1118,7 +1139,8 @@ func TestProxyStore_LabelNames(t *testing.T) {
title string
storeAPIs []Client

req *storepb.LabelNamesRequest
req *storepb.LabelNamesRequest
storeMatchers [][]storepb.LabelMatcher

expectedNames []string
expectedErr error
Expand Down Expand Up @@ -1197,6 +1219,56 @@ func TestProxyStore_LabelNames(t *testing.T) {
expectedNames: []string{"a", "b"},
expectedWarningsLen: 1,
},
{
title: "stores filtered by time range",
storeAPIs: []Client{
&testClient{
StoreClient: &mockedStoreAPI{
RespLabelNames: &storepb.LabelNamesResponse{
Names: []string{"a", "b"},
},
},
minTime: timestamp.FromTime(time.Now().Add(-4 * time.Hour)),
maxTime: timestamp.FromTime(time.Now().Add(-3 * time.Hour)),
},
&testClient{
StoreClient: &mockedStoreAPI{
RespLabelNames: &storepb.LabelNamesResponse{
Names: []string{"c", "d"},
},
},
minTime: timestamp.FromTime(time.Now().Add(-2 * time.Hour)),
maxTime: timestamp.FromTime(time.Now().Add(-1 * time.Hour)),
},
},
req: &storepb.LabelNamesRequest{
Start: timestamp.FromTime(time.Now().Add(-1 * time.Minute)),
End: timestamp.FromTime(time.Now()),
PartialResponseDisabled: false,
},
expectedNames: nil,
expectedWarningsLen: 0,
},
{
title: "store matchers specified",
storeAPIs: []Client{
&testClient{
StoreClient: &mockedStoreAPI{
RespLabelNames: &storepb.LabelNamesResponse{
Names: []string{"a", "b"},
},
},
},
},
req: &storepb.LabelNamesRequest{
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
PartialResponseDisabled: false,
},
storeMatchers: [][]storepb.LabelMatcher{{storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "foo"}}},
expectedNames: nil,
expectedWarningsLen: 0,
},
} {
if ok := t.Run(tc.title, func(t *testing.T) {
q := NewProxyStore(
Expand All @@ -1209,6 +1281,9 @@ func TestProxyStore_LabelNames(t *testing.T) {
)

ctx := context.Background()
if len(tc.storeMatchers) > 0 {
ctx = context.WithValue(ctx, StoreMatcherKey, tc.storeMatchers)
}
resp, err := q.LabelNames(ctx, tc.req)
if tc.expectedErr != nil {
testutil.NotOk(t, err)
Expand Down