Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix store query bug when running loki in single binary mode with boltdb-shipper #2655

Merged
merged 2 commits into from
Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 114 additions & 67 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ const (
tailerWaitEntryThrottle = time.Second / 2
)

type interval struct {
start, end time.Time
}

// Config for a querier.
type Config struct {
QueryTimeout time.Duration `yaml:"query_timeout"`
Expand Down Expand Up @@ -80,51 +84,40 @@ func (q *Querier) SelectLogs(ctx context.Context, params logql.SelectLogParams)
return nil, err
}

var chunkStoreIter iter.EntryIterator
ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End)

if q.cfg.IngesterQueryStoreMaxLookback == 0 {
// IngesterQueryStoreMaxLookback is zero, the default state, query the store normally
chunkStoreIter, err = q.store.SelectLogs(ctx, params)
iters := []iter.EntryIterator{}
if ingesterQueryInterval != nil {
// Make a copy of the request before modifying
// because the initial request is used below to query stores
queryRequestCopy := *params.QueryRequest
newParams := logql.SelectLogParams{
QueryRequest: &queryRequestCopy,
}
newParams.Start = ingesterQueryInterval.start
newParams.End = ingesterQueryInterval.end

ingesterIters, err := q.ingesterQuerier.SelectLogs(ctx, newParams)
if err != nil {
return nil, err
}
} else if q.cfg.IngesterQueryStoreMaxLookback > 0 {
// IngesterQueryStoreMaxLookback is greater than zero
// Adjust the store query range to only query for data ingesters are not already querying for
adjustedEnd := params.End.Add(-q.cfg.IngesterQueryStoreMaxLookback)
if params.Start.After(adjustedEnd) {
chunkStoreIter = iter.NoopIterator
} else {
// Make a copy of the request before modifying
// because the initial request is used below to query ingesters
queryRequestCopy := *params.QueryRequest
newParams := logql.SelectLogParams{
QueryRequest: &queryRequestCopy,
}
newParams.End = adjustedEnd
chunkStoreIter, err = q.store.SelectLogs(ctx, newParams)
if err != nil {
return nil, err
}
}
} else {
// IngesterQueryStoreMaxLookback is less than zero
// ingesters will be querying all the way back in time so there is no reason to query the store
chunkStoreIter = iter.NoopIterator
}

// skip ingester queries only when QueryIngestersWithin is enabled (not the zero value) and
// the end of the query is earlier than the lookback
if !shouldQueryIngester(q.cfg, params) {
return chunkStoreIter, nil
iters = append(iters, ingesterIters...)
}

iters, err := q.ingesterQuerier.SelectLogs(ctx, params)
if err != nil {
return nil, err
if storeQueryInterval != nil {
params.Start = storeQueryInterval.start
params.End = storeQueryInterval.end

storeIter, err := q.store.SelectLogs(ctx, params)
if err != nil {
return nil, err
}

iters = append(iters, storeIter)
}

return iter.NewHeapIterator(ctx, append(iters, chunkStoreIter), params.Direction), nil
return iter.NewHeapIterator(ctx, iters, params.Direction), nil
}

func (q *Querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) {
Expand All @@ -133,52 +126,106 @@ func (q *Querier) SelectSamples(ctx context.Context, params logql.SelectSamplePa
return nil, err
}

var chunkStoreIter iter.SampleIterator
ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End)

switch {
case q.cfg.IngesterQueryStoreMaxLookback == 0:
// IngesterQueryStoreMaxLookback is zero, the default state, query the store normally
chunkStoreIter, err = q.store.SelectSamples(ctx, params)
if err != nil {
return nil, err
}
case q.cfg.IngesterQueryStoreMaxLookback > 0:
adjustedEnd := params.End.Add(-q.cfg.IngesterQueryStoreMaxLookback)
if params.Start.After(adjustedEnd) {
chunkStoreIter = iter.NoopIterator
break
}
iters := []iter.SampleIterator{}
if ingesterQueryInterval != nil {
// Make a copy of the request before modifying
// because the initial request is used below to query ingesters
// because the initial request is used below to query stores
queryRequestCopy := *params.SampleQueryRequest
newParams := logql.SelectSampleParams{
SampleQueryRequest: &queryRequestCopy,
}
newParams.End = adjustedEnd
chunkStoreIter, err = q.store.SelectSamples(ctx, newParams)
newParams.Start = ingesterQueryInterval.start
newParams.End = ingesterQueryInterval.end

ingesterIters, err := q.ingesterQuerier.SelectSample(ctx, newParams)
if err != nil {
return nil, err
}
default:
chunkStoreIter = iter.NoopIterator

}
// skip ingester queries only when QueryIngestersWithin is enabled (not the zero value) and
// the end of the query is earlier than the lookback
if !shouldQueryIngester(q.cfg, params) {
return chunkStoreIter, nil
iters = append(iters, ingesterIters...)
}

iters, err := q.ingesterQuerier.SelectSample(ctx, params)
if err != nil {
return nil, err
if storeQueryInterval != nil {
params.Start = storeQueryInterval.start
params.End = storeQueryInterval.end

storeIter, err := q.store.SelectSamples(ctx, params)
if err != nil {
return nil, err
}

iters = append(iters, storeIter)
}
return iter.NewHeapSampleIterator(ctx, append(iters, chunkStoreIter)), nil
return iter.NewHeapSampleIterator(ctx, iters), nil
}

func shouldQueryIngester(cfg Config, params logql.QueryParams) bool {
lookback := time.Now().Add(-cfg.QueryIngestersWithin)
return !(cfg.QueryIngestersWithin != 0 && params.GetEnd().Before(lookback))
func (q *Querier) buildQueryIntervals(queryStart, queryEnd time.Time) (*interval, *interval) {
// limitQueryInterval is a flag for whether store queries should be limited to start time of ingester queries.
limitQueryInterval := false
// ingesterMLB having -1 means query ingester for whole duration.
ingesterMLB := time.Duration(-1)
if q.cfg.IngesterQueryStoreMaxLookback != 0 {
// IngesterQueryStoreMaxLookback takes the precedence over QueryIngestersWithin while also limiting the store query range.
limitQueryInterval = true
ingesterMLB = q.cfg.IngesterQueryStoreMaxLookback
} else if q.cfg.QueryIngestersWithin != 0 {
ingesterMLB = q.cfg.QueryIngestersWithin
}

// query ingester for whole duration.
if ingesterMLB == -1 {
i := &interval{
start: queryStart,
end: queryEnd,
}

if limitQueryInterval {
// query only ingesters.
return i, nil
}

// query both stores and ingesters without limiting the query interval.
return i, i
}
Comment on lines +177 to +191
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why query ingester for whole duration ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

The logic here is inconsistent with the documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to support the rare usecase where you run Loki with multiple single binaries and each of them storing the data on local filesystem without any sharing.


// see if there is an overlap between ingester query interval and actual query interval, if not just do the store query.
ingesterOldestStartTime := time.Now().Add(-ingesterMLB)
if queryEnd.Before(ingesterOldestStartTime) {
return nil, &interval{
start: queryStart,
end: queryEnd,
}
}

// if there is an overlap and we are not limiting the query interval then do both store and ingester query for whole query interval.
if !limitQueryInterval {
i := &interval{
start: queryStart,
end: queryEnd,
}
return i, i
}
Comment on lines +203 to +209
Copy link
Contributor

@lzh-lab lzh-lab Dec 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The microservices deployment mode with object store will run here no matter if there is an overlap.

ingester:
        query_store_max_look_back_period: 0  
querier:
        query_ingesters_within: 2h

This case will also query ingester for whole duration.


// limit the start of ingester query interval to ingesterOldestStartTime.
ingesterQueryInterval := &interval{
start: ingesterOldestStartTime,
end: queryEnd,
}

// limit the end of ingester query interval to ingesterOldestStartTime.
storeQueryInterval := &interval{
start: queryStart,
end: ingesterOldestStartTime,
}

// query touches only ingester query interval so do not do store query.
if storeQueryInterval.start.After(storeQueryInterval.end) {
storeQueryInterval = nil
}

return ingesterQueryInterval, storeQueryInterval
}

// Label does the heavy lifting for a Label query.
Expand Down
169 changes: 169 additions & 0 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,3 +524,172 @@ func TestQuerier_concurrentTailLimits(t *testing.T) {
})
}
}

func TestQuerier_buildQueryIntervals(t *testing.T) {
// For simplicity it is always assumed that ingesterQueryStoreMaxLookback and queryIngestersWithin both would be set upto 11 hours so
// overlappingQuery has range of last 11 hours while nonOverlappingQuery has range older than last 11 hours.
// We would test the cases below with both the queries.
overlappingQuery := interval{
start: time.Now().Add(-6 * time.Hour),
end: time.Now(),
}

nonOverlappingQuery := interval{
start: time.Now().Add(-24 * time.Hour),
end: time.Now().Add(-12 * time.Hour),
}

type response struct {
ingesterQueryInterval *interval
storeQueryInterval *interval
}

compareResponse := func(t *testing.T, expectedResponse, actualResponse response) {
if expectedResponse.ingesterQueryInterval == nil {
require.Nil(t, actualResponse.ingesterQueryInterval)
} else {
require.InDelta(t, expectedResponse.ingesterQueryInterval.start.Second(), actualResponse.ingesterQueryInterval.start.Second(), 1)
require.InDelta(t, expectedResponse.ingesterQueryInterval.end.Second(), expectedResponse.ingesterQueryInterval.end.Second(), 1)
}

if expectedResponse.storeQueryInterval == nil {
require.Nil(t, actualResponse.storeQueryInterval)
} else {
require.InDelta(t, expectedResponse.storeQueryInterval.start.Second(), actualResponse.storeQueryInterval.start.Second(), 1)
require.InDelta(t, expectedResponse.storeQueryInterval.end.Second(), expectedResponse.storeQueryInterval.end.Second(), 1)
}
}

for _, tc := range []struct {
name string
ingesterQueryStoreMaxLookback time.Duration
queryIngestersWithin time.Duration
overlappingQueryExpectedResponse response
nonOverlappingQueryExpectedResponse response
}{
{
name: "default values, query ingesters and store for whole duration",
overlappingQueryExpectedResponse: response{ // query both store and ingesters
ingesterQueryInterval: &overlappingQuery,
storeQueryInterval: &overlappingQuery,
},
nonOverlappingQueryExpectedResponse: response{ // query both store and ingesters
ingesterQueryInterval: &overlappingQuery,
storeQueryInterval: &overlappingQuery,
},
},
{
name: "ingesterQueryStoreMaxLookback set to 1h",
ingesterQueryStoreMaxLookback: time.Hour,
overlappingQueryExpectedResponse: response{ // query ingesters for last 1h and store until last 1h.
ingesterQueryInterval: &interval{
start: time.Now().Add(-time.Hour),
end: overlappingQuery.end,
},
storeQueryInterval: &interval{
start: overlappingQuery.start,
end: time.Now().Add(-time.Hour),
},
},
nonOverlappingQueryExpectedResponse: response{ // query just the store
storeQueryInterval: &nonOverlappingQuery,
},
},
{
name: "ingesterQueryStoreMaxLookback set to 10h",
ingesterQueryStoreMaxLookback: 10 * time.Hour,
overlappingQueryExpectedResponse: response{ // query just the ingesters.
ingesterQueryInterval: &overlappingQuery,
},
nonOverlappingQueryExpectedResponse: response{ // query just the store
storeQueryInterval: &nonOverlappingQuery,
},
},
{
name: "ingesterQueryStoreMaxLookback set to 1h and queryIngestersWithin set to 2h, ingesterQueryStoreMaxLookback takes precedence",
ingesterQueryStoreMaxLookback: time.Hour,
overlappingQueryExpectedResponse: response{ // query ingesters for last 1h and store until last 1h.
ingesterQueryInterval: &interval{
start: time.Now().Add(-time.Hour),
end: overlappingQuery.end,
},
storeQueryInterval: &interval{
start: overlappingQuery.start,
end: time.Now().Add(-time.Hour),
},
},
nonOverlappingQueryExpectedResponse: response{ // query just the store
storeQueryInterval: &nonOverlappingQuery,
},
},
{
name: "ingesterQueryStoreMaxLookback set to 2h and queryIngestersWithin set to 1h, ingesterQueryStoreMaxLookback takes precedence",
ingesterQueryStoreMaxLookback: time.Hour,
overlappingQueryExpectedResponse: response{ // query ingesters for last 2h and store until last 2h.
ingesterQueryInterval: &interval{
start: time.Now().Add(-2 * time.Hour),
end: overlappingQuery.end,
},
storeQueryInterval: &interval{
start: overlappingQuery.start,
end: time.Now().Add(-2 * time.Hour),
},
},
nonOverlappingQueryExpectedResponse: response{ // query just the store
storeQueryInterval: &nonOverlappingQuery,
},
},
{
name: "ingesterQueryStoreMaxLookback set to -1, query just ingesters",
ingesterQueryStoreMaxLookback: -1,
overlappingQueryExpectedResponse: response{
ingesterQueryInterval: &overlappingQuery,
},
nonOverlappingQueryExpectedResponse: response{
ingesterQueryInterval: &nonOverlappingQuery,
},
},
{
name: "queryIngestersWithin set to 1h",
queryIngestersWithin: time.Hour,
overlappingQueryExpectedResponse: response{ // query both store and ingesters since query overlaps queryIngestersWithin
ingesterQueryInterval: &overlappingQuery,
storeQueryInterval: &overlappingQuery,
},
nonOverlappingQueryExpectedResponse: response{ // query just the store since query doesn't overlap queryIngestersWithin
storeQueryInterval: &nonOverlappingQuery,
},
},
{
name: "queryIngestersWithin set to 10h",
queryIngestersWithin: time.Hour,
overlappingQueryExpectedResponse: response{ // query both store and ingesters since query overlaps queryIngestersWithin
ingesterQueryInterval: &overlappingQuery,
storeQueryInterval: &overlappingQuery,
},
nonOverlappingQueryExpectedResponse: response{ // query just the store since query doesn't overlap queryIngestersWithin
storeQueryInterval: &nonOverlappingQuery,
},
},
} {
t.Run(tc.name, func(t *testing.T) {
querier := Querier{cfg: Config{
IngesterQueryStoreMaxLookback: tc.ingesterQueryStoreMaxLookback,
QueryIngestersWithin: tc.queryIngestersWithin,
}}

ingesterQueryInterval, storeQueryInterval := querier.buildQueryIntervals(overlappingQuery.start, overlappingQuery.end)
compareResponse(t, tc.overlappingQueryExpectedResponse, response{
ingesterQueryInterval: ingesterQueryInterval,
storeQueryInterval: storeQueryInterval,
})

ingesterQueryInterval, storeQueryInterval = querier.buildQueryIntervals(nonOverlappingQuery.start, nonOverlappingQuery.end)
compareResponse(t, tc.nonOverlappingQueryExpectedResponse, response{
ingesterQueryInterval: ingesterQueryInterval,
storeQueryInterval: storeQueryInterval,
})

})
}
}