Skip to content

Commit

Permalink
align metric queries by step and other queries by split interval (#5181)
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepsukhani authored Jan 19, 2022
1 parent 1a7614f commit dad7fca
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 76 deletions.
43 changes: 36 additions & 7 deletions pkg/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func splitByTime(req queryrangebase.Request, interval time.Duration) ([]queryran

switch r := req.(type) {
case *LokiRequest:
forInterval(interval, r.StartTs, r.EndTs, func(start, end time.Time) {
forInterval(interval, r.StartTs, r.EndTs, false, func(start, end time.Time) {
reqs = append(reqs, &LokiRequest{
Query: r.Query,
Limit: r.Limit,
Expand All @@ -238,7 +238,7 @@ func splitByTime(req queryrangebase.Request, interval time.Duration) ([]queryran
})
})
case *LokiSeriesRequest:
forInterval(interval, r.StartTs, r.EndTs, func(start, end time.Time) {
forInterval(interval, r.StartTs, r.EndTs, true, func(start, end time.Time) {
reqs = append(reqs, &LokiSeriesRequest{
Match: r.Match,
Path: r.Path,
Expand All @@ -248,7 +248,7 @@ func splitByTime(req queryrangebase.Request, interval time.Duration) ([]queryran
})
})
case *LokiLabelNamesRequest:
forInterval(interval, r.StartTs, r.EndTs, func(start, end time.Time) {
forInterval(interval, r.StartTs, r.EndTs, true, func(start, end time.Time) {
reqs = append(reqs, &LokiLabelNamesRequest{
Path: r.Path,
StartTs: start,
Expand All @@ -261,11 +261,30 @@ func splitByTime(req queryrangebase.Request, interval time.Duration) ([]queryran
return reqs, nil
}

func forInterval(interval time.Duration, start, end time.Time, callback func(start, end time.Time)) {
// forInterval splits the given start and end time into given interval.
// When endTimeInclusive is true, it would keep a gap of 1ms between the splits.
// The only queries that have both start and end time inclusive are metadata queries,
// and without keeping a gap, we would end up querying duplicate data in adjacent queries.
func forInterval(interval time.Duration, start, end time.Time, endTimeInclusive bool, callback func(start, end time.Time)) {
// align the start time by split interval for better query performance of metadata queries and
// better cache-ability of query types that are cached.
ogStart := start
startNs := start.UnixNano()
start = time.Unix(0, startNs-startNs%interval.Nanoseconds())
firstInterval := true

for start := start; start.Before(end); start = start.Add(interval) {
newEnd := start.Add(interval)
if newEnd.After(end) {
if !newEnd.Before(end) {
newEnd = end
} else if endTimeInclusive {
newEnd = newEnd.Add(-time.Millisecond)
}

if firstInterval {
callback(ogStart, newEnd)
firstInterval = false
continue
}
callback(start, newEnd)
}
Expand Down Expand Up @@ -310,7 +329,7 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]quer
lokiReq := r.(*LokiRequest)
// step is >= configured split interval, let us just split the query interval by step
if lokiReq.Step >= interval.Milliseconds() {
forInterval(time.Duration(lokiReq.Step*1e6), lokiReq.StartTs, lokiReq.EndTs, func(start, end time.Time) {
forInterval(time.Duration(lokiReq.Step*1e6), lokiReq.StartTs, lokiReq.EndTs, false, func(start, end time.Time) {
reqs = append(reqs, &LokiRequest{
Query: lokiReq.Query,
Limit: lokiReq.Limit,
Expand All @@ -325,7 +344,12 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]quer
return reqs, nil
}

for start := lokiReq.StartTs; start.Before(lokiReq.EndTs); start = nextIntervalBoundary(start, r.GetStep(), interval).Add(time.Duration(r.GetStep()) * time.Millisecond) {
// nextIntervalBoundary always moves ahead in a multiple of steps but the time it returns would not be step aligned.
// To have step aligned intervals for better cache-ability of results, let us step align the start time which make all the split intervals step aligned.
startNs := lokiReq.StartTs.UnixNano()
start := time.Unix(0, startNs-startNs%(r.GetStep()*1e6))

for start := start; start.Before(lokiReq.EndTs); start = nextIntervalBoundary(start, r.GetStep(), interval).Add(time.Duration(r.GetStep()) * time.Millisecond) {
end := nextIntervalBoundary(start, r.GetStep(), interval)
if end.Add(time.Duration(r.GetStep())*time.Millisecond).After(lokiReq.EndTs) || end.Add(time.Duration(r.GetStep())*time.Millisecond) == lokiReq.EndTs {
end = lokiReq.EndTs
Expand All @@ -340,6 +364,11 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]quer
EndTs: end,
})
}

if len(reqs) != 0 {
// change the start time to original time
reqs[0] = reqs[0].WithStartEnd(lokiReq.GetStart(), reqs[0].GetEnd())
}
return reqs, nil
}

Expand Down
265 changes: 196 additions & 69 deletions pkg/querier/queryrange/split_by_interval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,87 +21,152 @@ import (
var nilMetrics = NewSplitByMetrics(nil)

func Test_splitQuery(t *testing.T) {
tests := []struct {
name string
req queryrangebase.Request
interval time.Duration
want []queryrangebase.Request
buildLokiRequest := func(start, end time.Time) queryrangebase.Request {
return &LokiRequest{
Query: "foo",
Limit: 1,
Step: 2,
StartTs: start,
EndTs: end,
Direction: logproto.BACKWARD,
Path: "/path",
}
}

buildLokiSeriesRequest := func(start, end time.Time) queryrangebase.Request {
return &LokiSeriesRequest{
Match: []string{"match1"},
StartTs: start,
EndTs: end,
Path: "/series",
Shards: []string{"shard1"},
}
}

buildLokiLabelNamesRequest := func(start, end time.Time) queryrangebase.Request {
return &LokiLabelNamesRequest{
StartTs: start,
EndTs: end,
Path: "/labels",
}
}

type interval struct {
start, end time.Time
}
for requestType, tc := range map[string]struct {
requestBuilderFunc func(start, end time.Time) queryrangebase.Request
endTimeInclusive bool
}{
{
"smaller request than interval",
&LokiRequest{
StartTs: time.Date(2019, 12, 9, 12, 0, 0, 1, time.UTC),
EndTs: time.Date(2019, 12, 9, 12, 30, 0, 0, time.UTC),
},
time.Hour,
[]queryrangebase.Request{
&LokiRequest{
StartTs: time.Date(2019, 12, 9, 12, 0, 0, 1, time.UTC),
EndTs: time.Date(2019, 12, 9, 12, 30, 0, 0, time.UTC),
},
},
"LokiRequest": {
buildLokiRequest,
false,
},
{
"exactly 1 interval",
&LokiRequest{
StartTs: time.Date(2019, 12, 9, 12, 1, 0, 0, time.UTC),
EndTs: time.Date(2019, 12, 9, 13, 1, 0, 0, time.UTC),
},
time.Hour,
[]queryrangebase.Request{
&LokiRequest{
StartTs: time.Date(2019, 12, 9, 12, 1, 0, 0, time.UTC),
EndTs: time.Date(2019, 12, 9, 13, 1, 0, 0, time.UTC),
},
},
"LokiSeriesRequest": {
buildLokiSeriesRequest,
true,
},
{
"2 intervals",
&LokiRequest{
StartTs: time.Date(2019, 12, 9, 12, 0, 0, 1, time.UTC),
EndTs: time.Date(2019, 12, 9, 13, 0, 0, 2, time.UTC),
"LokiLabelNamesRequest": {
buildLokiLabelNamesRequest,
true,
},
} {
expectedSplitGap := time.Duration(0)
if tc.endTimeInclusive {
expectedSplitGap = time.Millisecond
}
for name, intervals := range map[string]struct {
inp interval
expected []interval
}{
"no_change": {
inp: interval{
start: time.Unix(0, 0),
end: time.Unix(0, (1 * time.Hour).Nanoseconds()),
},
expected: []interval{
{
start: time.Unix(0, 0),
end: time.Unix(0, (1 * time.Hour).Nanoseconds()),
},
},
},
time.Hour,
[]queryrangebase.Request{
&LokiRequest{
StartTs: time.Date(2019, 12, 9, 12, 0, 0, 1, time.UTC),
EndTs: time.Date(2019, 12, 9, 13, 0, 0, 1, time.UTC),
"align_start": {
inp: interval{
start: time.Unix(0, (5 * time.Minute).Nanoseconds()),
end: time.Unix(0, (2 * time.Hour).Nanoseconds()),
},
&LokiRequest{
StartTs: time.Date(2019, 12, 9, 13, 0, 0, 1, time.UTC),
EndTs: time.Date(2019, 12, 9, 13, 0, 0, 2, time.UTC),
expected: []interval{
{
start: time.Unix(0, (5 * time.Minute).Nanoseconds()),
end: time.Unix(0, (1 * time.Hour).Nanoseconds()).Add(-expectedSplitGap),
},
{
start: time.Unix(0, (1 * time.Hour).Nanoseconds()),
end: time.Unix(0, (2 * time.Hour).Nanoseconds()),
},
},
},
},
{
"3 intervals series",
&LokiSeriesRequest{
StartTs: time.Date(2019, 12, 9, 12, 0, 0, 1, time.UTC),
EndTs: time.Date(2019, 12, 9, 16, 0, 0, 2, time.UTC),
"align_end": {
inp: interval{
start: time.Unix(0, 0),
end: time.Unix(0, (115 * time.Minute).Nanoseconds()),
},
expected: []interval{
{
start: time.Unix(0, 0),
end: time.Unix(0, (1 * time.Hour).Nanoseconds()).Add(-expectedSplitGap),
},
{
start: time.Unix(0, (1 * time.Hour).Nanoseconds()),
end: time.Unix(0, (115 * time.Minute).Nanoseconds()),
},
},
},
2 * time.Hour,
[]queryrangebase.Request{
&LokiSeriesRequest{
StartTs: time.Date(2019, 12, 9, 12, 0, 0, 1, time.UTC),
EndTs: time.Date(2019, 12, 9, 14, 0, 0, 1, time.UTC),
"align_both": {
inp: interval{
start: time.Unix(0, (5 * time.Minute).Nanoseconds()),
end: time.Unix(0, (175 * time.Minute).Nanoseconds()),
},
expected: []interval{
{
start: time.Unix(0, (5 * time.Minute).Nanoseconds()),
end: time.Unix(0, (1 * time.Hour).Nanoseconds()).Add(-expectedSplitGap),
},
{
start: time.Unix(0, (1 * time.Hour).Nanoseconds()),
end: time.Unix(0, (2 * time.Hour).Nanoseconds()).Add(-expectedSplitGap),
},
{
start: time.Unix(0, (2 * time.Hour).Nanoseconds()),
end: time.Unix(0, (175 * time.Minute).Nanoseconds()),
},
},
&LokiSeriesRequest{
StartTs: time.Date(2019, 12, 9, 14, 0, 0, 1, time.UTC),
EndTs: time.Date(2019, 12, 9, 16, 0, 0, 1, time.UTC),
},
"no_align": {
inp: interval{
start: time.Unix(0, (5 * time.Minute).Nanoseconds()),
end: time.Unix(0, (55 * time.Minute).Nanoseconds()),
},
&LokiSeriesRequest{
StartTs: time.Date(2019, 12, 9, 16, 0, 0, 1, time.UTC),
EndTs: time.Date(2019, 12, 9, 16, 0, 0, 2, time.UTC),
expected: []interval{
{
start: time.Unix(0, (5 * time.Minute).Nanoseconds()),
end: time.Unix(0, (55 * time.Minute).Nanoseconds()),
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := splitByTime(tt.req, tt.interval)
require.NoError(t, err)
require.Equal(t, tt.want, got)
})
} {
t.Run(fmt.Sprintf("%s - %s", name, requestType), func(t *testing.T) {
inp := tc.requestBuilderFunc(intervals.inp.start, intervals.inp.end)
var want []queryrangebase.Request
for _, interval := range intervals.expected {
want = append(want, tc.requestBuilderFunc(interval.start, interval.end))
}
splits, err := splitByTime(inp, time.Hour)
require.NoError(t, err)
require.Equal(t, want, splits)
})
}
}
}

Expand Down Expand Up @@ -287,6 +352,68 @@ func Test_splitMetricQuery(t *testing.T) {
interval: 3 * time.Hour,
},

// step not a multiple of interval
// start time already step aligned
{
input: &LokiRequest{
StartTs: time.Unix(2*3600-9, 0), // 2h mod 17s = 9s
EndTs: time.Unix(3*3*3600, 0),
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
expected: []queryrangebase.Request{
&LokiRequest{
StartTs: time.Unix(2*3600-9, 0),
EndTs: time.Unix((3*3600)-5, 0), // 3h mod 17s = 5s
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
&LokiRequest{
StartTs: time.Unix((3*3600)+12, 0),
EndTs: time.Unix((2*3*3600)-10, 0), // 6h mod 17s = 10s
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
&LokiRequest{
StartTs: time.Unix(2*3*3600+7, 0),
EndTs: time.Unix(3*3*3600, 0),
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
},
interval: 3 * time.Hour,
},
// start time not aligned with step
{
input: &LokiRequest{
StartTs: time.Unix(2*3600, 0),
EndTs: time.Unix(3*3*3600, 0),
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
expected: []queryrangebase.Request{
&LokiRequest{
StartTs: time.Unix(2*3600, 0),
EndTs: time.Unix((3*3600)-5, 0), // 3h mod 17s = 5s
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
&LokiRequest{
StartTs: time.Unix((3*3600)+12, 0),
EndTs: time.Unix((2*3*3600)-10, 0), // 6h mod 17s = 10s
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
&LokiRequest{
StartTs: time.Unix(2*3*3600+7, 0),
EndTs: time.Unix(3*3*3600, 0),
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
},
interval: 3 * time.Hour,
},

// step larger than split interval
{
input: &LokiRequest{
Expand Down

0 comments on commit dad7fca

Please sign in to comment.