From 8d0fab8f8ad1f41f21322cc05e6c64f605a52270 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Tue, 22 Sep 2020 15:01:22 +0530 Subject: [PATCH 1/2] fix store query bug when running loki in single binary mode with boltdb-shipper --- pkg/querier/querier.go | 181 +++++++++++++++++++++++------------- pkg/querier/querier_test.go | 169 +++++++++++++++++++++++++++++++++ 2 files changed, 283 insertions(+), 67 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index f98896bd25268..f6986e3526821 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -29,6 +29,10 @@ const ( tailerWaitEntryThrottle = time.Second / 2 ) +type interval struct { + start, end time.Time +} + // Config for a querier. type Config struct { QueryTimeout time.Duration `yaml:"query_timeout"` @@ -80,51 +84,40 @@ func (q *Querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) return nil, err } - var chunkStoreIter iter.EntryIterator + ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End) - if q.cfg.IngesterQueryStoreMaxLookback == 0 { - // IngesterQueryStoreMaxLookback is zero, the default state, query the store normally - chunkStoreIter, err = q.store.SelectLogs(ctx, params) + iters := []iter.EntryIterator{} + if ingesterQueryInterval != nil { + // Make a copy of the request before modifying + // because the initial request is used below to query stores + queryRequestCopy := *params.QueryRequest + newParams := logql.SelectLogParams{ + QueryRequest: &queryRequestCopy, + } + newParams.Start = ingesterQueryInterval.start + newParams.End = ingesterQueryInterval.end + + ingesterIters, err := q.ingesterQuerier.SelectLogs(ctx, newParams) if err != nil { return nil, err } - } else if q.cfg.IngesterQueryStoreMaxLookback > 0 { - // IngesterQueryStoreMaxLookback is greater than zero - // Adjust the store query range to only query for data ingesters are not already querying for - adjustedEnd := params.End.Add(-q.cfg.IngesterQueryStoreMaxLookback) - if params.Start.After(adjustedEnd) { - chunkStoreIter = iter.NoopIterator - } else { - // Make a copy of the request before modifying - // because the initial request is used below to query ingesters - queryRequestCopy := *params.QueryRequest - newParams := logql.SelectLogParams{ - QueryRequest: &queryRequestCopy, - } - newParams.End = adjustedEnd - chunkStoreIter, err = q.store.SelectLogs(ctx, newParams) - if err != nil { - return nil, err - } - } - } else { - // IngesterQueryStoreMaxLookback is less than zero - // ingesters will be querying all the way back in time so there is no reason to query the store - chunkStoreIter = iter.NoopIterator - } - // skip ingester queries only when QueryIngestersWithin is enabled (not the zero value) and - // the end of the query is earlier than the lookback - if !shouldQueryIngester(q.cfg, params) { - return chunkStoreIter, nil + iters = append(iters, ingesterIters...) } - iters, err := q.ingesterQuerier.SelectLogs(ctx, params) - if err != nil { - return nil, err + if storeQueryInterval != nil { + params.Start = storeQueryInterval.start + params.End = storeQueryInterval.end + + storeIter, err := q.store.SelectLogs(ctx, params) + if err != nil { + return nil, err + } + + iters = append(iters, storeIter) } - return iter.NewHeapIterator(ctx, append(iters, chunkStoreIter), params.Direction), nil + return iter.NewHeapIterator(ctx, iters, params.Direction), nil } func (q *Querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) { @@ -133,52 +126,106 @@ func (q *Querier) SelectSamples(ctx context.Context, params logql.SelectSamplePa return nil, err } - var chunkStoreIter iter.SampleIterator + ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End) - switch { - case q.cfg.IngesterQueryStoreMaxLookback == 0: - // IngesterQueryStoreMaxLookback is zero, the default state, query the store normally - chunkStoreIter, err = q.store.SelectSamples(ctx, params) - if err != nil { - return nil, err - } - case q.cfg.IngesterQueryStoreMaxLookback > 0: - adjustedEnd := params.End.Add(-q.cfg.IngesterQueryStoreMaxLookback) - if params.Start.After(adjustedEnd) { - chunkStoreIter = iter.NoopIterator - break - } + iters := []iter.SampleIterator{} + if ingesterQueryInterval != nil { // Make a copy of the request before modifying - // because the initial request is used below to query ingesters + // because the initial request is used below to query stores queryRequestCopy := *params.SampleQueryRequest newParams := logql.SelectSampleParams{ SampleQueryRequest: &queryRequestCopy, } - newParams.End = adjustedEnd - chunkStoreIter, err = q.store.SelectSamples(ctx, newParams) + newParams.Start = ingesterQueryInterval.start + newParams.End = ingesterQueryInterval.end + + ingesterIters, err := q.ingesterQuerier.SelectSample(ctx, newParams) if err != nil { return nil, err } - default: - chunkStoreIter = iter.NoopIterator - } - // skip ingester queries only when QueryIngestersWithin is enabled (not the zero value) and - // the end of the query is earlier than the lookback - if !shouldQueryIngester(q.cfg, params) { - return chunkStoreIter, nil + iters = append(iters, ingesterIters...) } - iters, err := q.ingesterQuerier.SelectSample(ctx, params) - if err != nil { - return nil, err + if storeQueryInterval != nil { + params.Start = storeQueryInterval.start + params.End = storeQueryInterval.end + + storeIter, err := q.store.SelectSamples(ctx, params) + if err != nil { + return nil, err + } + + iters = append(iters, storeIter) } - return iter.NewHeapSampleIterator(ctx, append(iters, chunkStoreIter)), nil + return iter.NewHeapSampleIterator(ctx, iters), nil } -func shouldQueryIngester(cfg Config, params logql.QueryParams) bool { - lookback := time.Now().Add(-cfg.QueryIngestersWithin) - return !(cfg.QueryIngestersWithin != 0 && params.GetEnd().Before(lookback)) +func (q *Querier) buildQueryIntervals(queryStart, queryEnd time.Time) (*interval, *interval) { + // limitQueryInterval is a flag for whether queries should be limited to start time of ingester queries. + limitQueryInterval := false + // ingesterMLB having -1 means query ingester for whole duration. + ingesterMLB := time.Duration(-1) + if q.cfg.IngesterQueryStoreMaxLookback != 0 { + // IngesterQueryStoreMaxLookback takes the precedence over QueryIngestersWithin while also limiting the store query range. + limitQueryInterval = true + ingesterMLB = q.cfg.IngesterQueryStoreMaxLookback + } else if q.cfg.QueryIngestersWithin != 0 { + ingesterMLB = q.cfg.QueryIngestersWithin + } + + // query ingester for whole duration. + if ingesterMLB == -1 { + i := &interval{ + start: queryStart, + end: queryEnd, + } + + if limitQueryInterval { + // query only ingesters. + return i, nil + } + + // query both stores and ingesters without limiting the query interval. + return i, i + } + + // see if there is an overlap between ingester query interval and actual query interval, if not just do the store query. + ingesterOldestStartTime := time.Now().Add(-ingesterMLB) + if queryEnd.Before(ingesterOldestStartTime) { + return nil, &interval{ + start: queryStart, + end: queryEnd, + } + } + + // if there is an overlap and we are not limiting the query interval then do both store and ingester query for whole query interval. + if !limitQueryInterval { + i := &interval{ + start: queryStart, + end: queryEnd, + } + return i, i + } + + // limit the start of ingester query interval to ingesterOldestStartTime. + ingesterQueryInterval := &interval{ + start: ingesterOldestStartTime, + end: queryEnd, + } + + // limit the end of ingester query interval to ingesterOldestStartTime. + storeQueryInterval := &interval{ + start: queryStart, + end: ingesterOldestStartTime, + } + + // query touches only ingester query interval so do not do store query. + if storeQueryInterval.start.After(storeQueryInterval.end) { + storeQueryInterval = nil + } + + return ingesterQueryInterval, storeQueryInterval } // Label does the heavy lifting for a Label query. diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index c37f2bd8e8f65..196991f957b27 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -524,3 +524,172 @@ func TestQuerier_concurrentTailLimits(t *testing.T) { }) } } + +func TestQuerier_buildQueryIntervals(t *testing.T) { + // For simplicity it is always assumed that ingesterQueryStoreMaxLookback and queryIngestersWithin both would be set upto 11 hours so + // overlappingQuery has range of last 11 hours while nonOverlappingQuery has range older than last 11 hours. + // We would test the cases below with both the queries. + overlappingQuery := interval{ + start: time.Now().Add(-6 * time.Hour), + end: time.Now(), + } + + nonOverlappingQuery := interval{ + start: time.Now().Add(-24 * time.Hour), + end: time.Now().Add(-12 * time.Hour), + } + + type response struct { + ingesterQueryInterval *interval + storeQueryInterval *interval + } + + compareResponse := func(t *testing.T, expectedResponse, actualResponse response) { + if expectedResponse.ingesterQueryInterval == nil { + require.Nil(t, actualResponse.ingesterQueryInterval) + } else { + require.InDelta(t, expectedResponse.ingesterQueryInterval.start.Second(), actualResponse.ingesterQueryInterval.start.Second(), 1) + require.InDelta(t, expectedResponse.ingesterQueryInterval.end.Second(), expectedResponse.ingesterQueryInterval.end.Second(), 1) + } + + if expectedResponse.storeQueryInterval == nil { + require.Nil(t, actualResponse.storeQueryInterval) + } else { + require.InDelta(t, expectedResponse.storeQueryInterval.start.Second(), actualResponse.storeQueryInterval.start.Second(), 1) + require.InDelta(t, expectedResponse.storeQueryInterval.end.Second(), expectedResponse.storeQueryInterval.end.Second(), 1) + } + } + + for _, tc := range []struct { + name string + ingesterQueryStoreMaxLookback time.Duration + queryIngestersWithin time.Duration + overlappingQueryExpectedResponse response + nonOverlappingQueryExpectedResponse response + }{ + { + name: "default values, query ingesters and store for whole duration", + overlappingQueryExpectedResponse: response{ // query both store and ingesters + ingesterQueryInterval: &overlappingQuery, + storeQueryInterval: &overlappingQuery, + }, + nonOverlappingQueryExpectedResponse: response{ // query both store and ingesters + ingesterQueryInterval: &overlappingQuery, + storeQueryInterval: &overlappingQuery, + }, + }, + { + name: "ingesterQueryStoreMaxLookback set to 1h", + ingesterQueryStoreMaxLookback: time.Hour, + overlappingQueryExpectedResponse: response{ // query ingesters for last 1h and store until last 1h. + ingesterQueryInterval: &interval{ + start: time.Now().Add(-time.Hour), + end: overlappingQuery.end, + }, + storeQueryInterval: &interval{ + start: overlappingQuery.start, + end: time.Now().Add(-time.Hour), + }, + }, + nonOverlappingQueryExpectedResponse: response{ // query just the store + storeQueryInterval: &nonOverlappingQuery, + }, + }, + { + name: "ingesterQueryStoreMaxLookback set to 10h", + ingesterQueryStoreMaxLookback: 10 * time.Hour, + overlappingQueryExpectedResponse: response{ // query just the ingesters. + ingesterQueryInterval: &overlappingQuery, + }, + nonOverlappingQueryExpectedResponse: response{ // query just the store + storeQueryInterval: &nonOverlappingQuery, + }, + }, + { + name: "ingesterQueryStoreMaxLookback set to 1h and queryIngestersWithin set to 2h, ingesterQueryStoreMaxLookback takes precedence", + ingesterQueryStoreMaxLookback: time.Hour, + overlappingQueryExpectedResponse: response{ // query ingesters for last 1h and store until last 1h. + ingesterQueryInterval: &interval{ + start: time.Now().Add(-time.Hour), + end: overlappingQuery.end, + }, + storeQueryInterval: &interval{ + start: overlappingQuery.start, + end: time.Now().Add(-time.Hour), + }, + }, + nonOverlappingQueryExpectedResponse: response{ // query just the store + storeQueryInterval: &nonOverlappingQuery, + }, + }, + { + name: "ingesterQueryStoreMaxLookback set to 2h and queryIngestersWithin set to 1h, ingesterQueryStoreMaxLookback takes precedence", + ingesterQueryStoreMaxLookback: time.Hour, + overlappingQueryExpectedResponse: response{ // query ingesters for last 2h and store until last 2h. + ingesterQueryInterval: &interval{ + start: time.Now().Add(-2 * time.Hour), + end: overlappingQuery.end, + }, + storeQueryInterval: &interval{ + start: overlappingQuery.start, + end: time.Now().Add(-2 * time.Hour), + }, + }, + nonOverlappingQueryExpectedResponse: response{ // query just the store + storeQueryInterval: &nonOverlappingQuery, + }, + }, + { + name: "ingesterQueryStoreMaxLookback set to -1, query just ingesters", + ingesterQueryStoreMaxLookback: -1, + overlappingQueryExpectedResponse: response{ + ingesterQueryInterval: &overlappingQuery, + }, + nonOverlappingQueryExpectedResponse: response{ + ingesterQueryInterval: &nonOverlappingQuery, + }, + }, + { + name: "queryIngestersWithin set to 1h", + queryIngestersWithin: time.Hour, + overlappingQueryExpectedResponse: response{ // query both store and ingesters since query overlaps queryIngestersWithin + ingesterQueryInterval: &overlappingQuery, + storeQueryInterval: &overlappingQuery, + }, + nonOverlappingQueryExpectedResponse: response{ // query just the store since query doesn't overlap queryIngestersWithin + storeQueryInterval: &nonOverlappingQuery, + }, + }, + { + name: "queryIngestersWithin set to 10h", + queryIngestersWithin: time.Hour, + overlappingQueryExpectedResponse: response{ // query both store and ingesters since query overlaps queryIngestersWithin + ingesterQueryInterval: &overlappingQuery, + storeQueryInterval: &overlappingQuery, + }, + nonOverlappingQueryExpectedResponse: response{ // query just the store since query doesn't overlap queryIngestersWithin + storeQueryInterval: &nonOverlappingQuery, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + querier := Querier{cfg: Config{ + IngesterQueryStoreMaxLookback: tc.ingesterQueryStoreMaxLookback, + QueryIngestersWithin: tc.queryIngestersWithin, + }} + + ingesterQueryInterval, storeQueryInterval := querier.buildQueryIntervals(overlappingQuery.start, overlappingQuery.end) + compareResponse(t, tc.overlappingQueryExpectedResponse, response{ + ingesterQueryInterval: ingesterQueryInterval, + storeQueryInterval: storeQueryInterval, + }) + + ingesterQueryInterval, storeQueryInterval = querier.buildQueryIntervals(nonOverlappingQuery.start, nonOverlappingQuery.end) + compareResponse(t, tc.nonOverlappingQueryExpectedResponse, response{ + ingesterQueryInterval: ingesterQueryInterval, + storeQueryInterval: storeQueryInterval, + }) + + }) + } +} From 46e6454189b280e35a0c1b8624d03ca7ad5a4b0d Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Fri, 25 Sep 2020 11:43:53 +0530 Subject: [PATCH 2/2] update comment --- pkg/querier/querier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index f6986e3526821..a77f6ff93c47c 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -162,7 +162,7 @@ func (q *Querier) SelectSamples(ctx context.Context, params logql.SelectSamplePa } func (q *Querier) buildQueryIntervals(queryStart, queryEnd time.Time) (*interval, *interval) { - // limitQueryInterval is a flag for whether queries should be limited to start time of ingester queries. + // limitQueryInterval is a flag for whether store queries should be limited to start time of ingester queries. limitQueryInterval := false // ingesterMLB having -1 means query ingester for whole duration. ingesterMLB := time.Duration(-1)