Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache vertical shards in query frontend #5648

Merged
merged 3 commits into from
Sep 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pkg/queryfrontend/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,19 @@ func (t thanosCacheKeyGenerator) GenerateCacheKey(userID string, r queryrange.Re
i := 0
for ; i < len(t.resolutions) && t.resolutions[i] > tr.MaxSourceResolution; i++ {
}
return fmt.Sprintf("fe:%s:%s:%d:%d:%d", userID, tr.Query, tr.Step, currentInterval, i)
shardInfoKey := generateShardInfoKey(tr)
return fmt.Sprintf("fe:%s:%s:%d:%d:%d:%s", userID, tr.Query, tr.Step, currentInterval, i, shardInfoKey)
case *ThanosLabelsRequest:
return fmt.Sprintf("fe:%s:%s:%s:%d", userID, tr.Label, tr.Matchers, currentInterval)
case *ThanosSeriesRequest:
return fmt.Sprintf("fe:%s:%s:%d", userID, tr.Matchers, currentInterval)
}
return fmt.Sprintf("fe:%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval)
}

func generateShardInfoKey(r *ThanosQueryRangeRequest) string {
if r.ShardInfo == nil {
return "-"
}
return fmt.Sprintf("%d:%d", r.ShardInfo.TotalShards, r.ShardInfo.ShardIndex)
}
16 changes: 9 additions & 7 deletions pkg/queryfrontend/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestGenerateCacheKey(t *testing.T) {
Start: 0,
Step: 60 * seconds,
},
expected: "fe::up:60000:0:2",
expected: "fe::up:60000:0:2:-",
},
{
name: "10s step",
Expand All @@ -46,7 +46,7 @@ func TestGenerateCacheKey(t *testing.T) {
Start: 0,
Step: 10 * seconds,
},
expected: "fe::up:10000:0:2",
expected: "fe::up:10000:0:2:-",
},
{
name: "1m downsampling resolution",
Expand All @@ -56,7 +56,7 @@ func TestGenerateCacheKey(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: 60 * seconds,
},
expected: "fe::up:10000:0:2",
expected: "fe::up:10000:0:2:-",
},
{
name: "5m downsampling resolution, different cache key",
Expand All @@ -66,7 +66,7 @@ func TestGenerateCacheKey(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: 300 * seconds,
},
expected: "fe::up:10000:0:1",
expected: "fe::up:10000:0:1:-",
},
{
name: "1h downsampling resolution, different cache key",
Expand All @@ -76,7 +76,7 @@ func TestGenerateCacheKey(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: hour,
},
expected: "fe::up:10000:0:0",
expected: "fe::up:10000:0:0:-",
},
{
name: "label names, no matcher",
Expand Down Expand Up @@ -134,7 +134,9 @@ func TestGenerateCacheKey(t *testing.T) {
expected: `fe::up:[[foo="bar"] [baz="qux"]]:0`,
},
} {
key := splitter.GenerateCacheKey("", tc.req)
testutil.Equals(t, tc.expected, key)
t.Run(tc.name, func(t *testing.T) {
key := splitter.GenerateCacheKey("", tc.req)
testutil.Equals(t, tc.expected, key)
})
}
}
14 changes: 7 additions & 7 deletions pkg/queryfrontend/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ func newQueryRangeTripperware(
)
}

if numShards > 0 {
queryRangeMiddleware = append(
queryRangeMiddleware,
PromQLShardingMiddleware(querysharding.NewQueryAnalyzer(), numShards, limits, codec, reg),
)
}

if config.ResultsCacheConfig != nil {
queryCacheMiddleware, _, err := queryrange.NewResultsCacheMiddleware(
logger,
Expand Down Expand Up @@ -222,13 +229,6 @@ func newQueryRangeTripperware(
)
}

if numShards > 0 {
queryRangeMiddleware = append(
queryRangeMiddleware,
PromQLShardingMiddleware(querysharding.NewQueryAnalyzer(), numShards, limits, codec, reg),
)
}

return func(next http.RoundTripper) http.RoundTripper {
rt := queryrange.NewRoundTripper(next, codec, forwardHeaders, queryRangeMiddleware...)
return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
Expand Down
142 changes: 141 additions & 1 deletion pkg/queryfrontend/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,6 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
},
} {
if !t.Run(tc.name, func(t *testing.T) {

ctx := user.InjectOrgID(context.Background(), "1")
httpReq, err := NewThanosQueryRangeCodec(true).EncodeRequest(ctx, tc.req)
testutil.Ok(t, err)
Expand All @@ -479,6 +478,96 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
}
}

func TestRoundTripQueryCacheWithShardingMiddleware(t *testing.T) {
testRequest := &ThanosQueryRangeRequest{
Path: "/api/v1/query_range",
Start: 0,
End: 2 * hour,
Step: 10 * seconds,
Dedup: true,
Query: "sum by (pod) (memory_usage)",
Timeout: hour,
}

cacheConf := &queryrange.ResultsCacheConfig{
CacheConfig: cortexcache.Config{
EnableFifoCache: true,
Fifocache: cortexcache.FifoCacheConfig{
MaxSizeBytes: "1MiB",
MaxSizeItems: 1000,
Validity: time.Hour,
},
},
}

tpw, err := NewTripperware(
Config{
NumShards: 2,
QueryRangeConfig: QueryRangeConfig{
Limits: defaultLimits,
ResultsCacheConfig: cacheConf,
SplitQueriesByInterval: day,
},
}, nil, log.NewNopLogger(),
)
testutil.Ok(t, err)

rt, err := newFakeRoundTripper()
testutil.Ok(t, err)
defer rt.Close()
res, handler := promqlResultsWithFailures(3)
rt.setHandler(handler)

for _, tc := range []struct {
name string
req queryrange.Request
err bool
expected int
}{
{
name: "query with vertical sharding",
req: testRequest,
err: true,
expected: 2,
},
{
name: "same query as before, both requests are executed",
req: testRequest,
err: true,
expected: 4,
},
{
name: "same query as before, one request is executed",
req: testRequest,
err: false,
expected: 5,
},
{
name: "same query as before again, no requests are executed",
req: testRequest,
err: false,
expected: 5,
},
} {
if !t.Run(tc.name, func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "1")
httpReq, err := NewThanosQueryRangeCodec(true).EncodeRequest(ctx, tc.req)
testutil.Ok(t, err)

_, err = tpw(rt).RoundTrip(httpReq)
if tc.err {
testutil.NotOk(t, err)
} else {
testutil.Ok(t, err)
}

testutil.Equals(t, tc.expected, *res)
}) {
break
}
}
}

// TestRoundTripLabelsCacheMiddleware tests the cache middleware for labels requests.
func TestRoundTripLabelsCacheMiddleware(t *testing.T) {
testRequest := &ThanosLabelsRequest{
Expand Down Expand Up @@ -730,6 +819,57 @@ func promqlResults(fail bool) (*int, http.Handler) {
})
}

// promqlResultsWithFailures is a mock handler used to test split and cache middleware.
// it will return a failed response numFailures times.
func promqlResultsWithFailures(numFailures int) (*int, http.Handler) {
count := 0
var lock sync.Mutex
q := queryrange.PrometheusResponse{
Status: "success",
Data: queryrange.PrometheusData{
ResultType: string(parser.ValueTypeMatrix),
Result: []queryrange.SampleStream{
{
Labels: []cortexpb.LabelAdapter{},
Samples: []cortexpb.Sample{
{Value: 0, TimestampMs: 0},
{Value: 1, TimestampMs: 1},
},
},
},
},
}

cond := sync.NewCond(&sync.Mutex{})
cond.L.Lock()
return &count, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
lock.Lock()
defer lock.Unlock()

// Set fail in the response code to test retry.
if numFailures > 0 {
numFailures--

// Wait for a successful request.
// Release the lock to allow other requests to execute.
if numFailures == 0 {
lock.Unlock()
cond.Wait()
<-time.After(500 * time.Millisecond)
lock.Lock()
}
w.WriteHeader(500)
}
if err := json.NewEncoder(w).Encode(q); err != nil {
panic(err)
}
if numFailures == 0 {
cond.Broadcast()
}
count++
})
}

// labelsResults is a mock handler used to test split and cache middleware for label names and label values requests.
func labelsResults(fail bool) (*int, http.Handler) {
count := 0
Expand Down