From baaf5a9203687f337c05dab3a828156601f4a553 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Sat, 1 Aug 2020 11:12:15 -0400 Subject: [PATCH 1/9] sampleIter uses correct cache --- pkg/storage/lazy_chunk.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/lazy_chunk.go b/pkg/storage/lazy_chunk.go index 709442e121389..9dee210c44fbe 100644 --- a/pkg/storage/lazy_chunk.go +++ b/pkg/storage/lazy_chunk.go @@ -129,7 +129,7 @@ func (c *LazyChunk) SampleIterator( continue } if nextChunk != nil { - delete(c.overlappingBlocks, b.Offset()) + delete(c.overlappingSampleBlocks, b.Offset()) } // non-overlapping block with the next chunk are not cached. its = append(its, b.SampleIterator(ctx, filter, extractor)) From fe11b16e5275609296150c404ab3edcb18994324 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 3 Aug 2020 20:41:41 -0400 Subject: [PATCH 2/9] minimal lazy chunk iterator test --- pkg/storage/lazy_chunk_test.go | 39 ++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/pkg/storage/lazy_chunk_test.go b/pkg/storage/lazy_chunk_test.go index 9495e43353a6f..226cee28dda30 100644 --- a/pkg/storage/lazy_chunk_test.go +++ b/pkg/storage/lazy_chunk_test.go @@ -2,6 +2,7 @@ package storage import ( "context" + "fmt" "testing" "time" @@ -15,6 +16,44 @@ import ( "github.com/grafana/loki/pkg/util" ) +func TestLazyChunkIterator(t *testing.T) { + for i, tc := range []struct { + chunk *LazyChunk + expected []logproto.Stream + }{ + { + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + }, + }), + []logproto.Stream{ + { + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + }, + }, + }, + }, + } { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + it, err := tc.chunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, logql.TrueFilter, nil) + require.Nil(t, err) + streams, _, err := iter.ReadBatch(it, 1000) + require.Nil(t, err) + _ = it.Close() + require.Equal(t, tc.expected, streams.Streams) + }) + } +} + func TestIsOverlapping(t *testing.T) { tests := []struct { name string From de723865fdc1d19994d75a67895ebdb5e31ea089 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 3 Aug 2020 21:58:41 -0400 Subject: [PATCH 3/9] memchunk Blocks() inclusivity --- pkg/chunkenc/memchunk.go | 2 +- pkg/chunkenc/memchunk_test.go | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 407cd3add9b3c..6a96dc5f530ff 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -532,7 +532,7 @@ func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block { blocks := make([]Block, 0, len(c.blocks)) for _, b := range c.blocks { - if maxt > b.mint && b.maxt > mint { + if maxt >= b.mint && b.maxt >= mint { blocks = append(blocks, b) } } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index b05b47781193b..f8b93057dbc1f 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -37,6 +37,18 @@ var ( testTargetSize = 1500 * 1024 ) +func TestBlocksInclusive(t *testing.T) { + chk := NewMemChunk(EncNone, testBlockSize, testTargetSize) + err := chk.Append(logprotoEntry(1, "1")) + require.Nil(t, err) + err = chk.cut() + require.Nil(t, err) + + blocks := chk.Blocks(time.Unix(0, 1), time.Unix(0, 1)) + require.Equal(t, 1, len(blocks)) + require.Equal(t, 1, blocks[0].Entries()) +} + func TestBlock(t *testing.T) { for _, enc := range testEncoding { t.Run(enc.String(), func(t *testing.T) { From 0a3c989b6b5a82c4075ad26a2e0dedbc4833c2d6 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 3 Aug 2020 22:12:48 -0400 Subject: [PATCH 4/9] fixes a few edge cases in the batchiterator batching --- pkg/storage/batch.go | 13 ++++- pkg/storage/batch_test.go | 102 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 2 deletions(-) diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index 1578029aa99dd..5cf2016296ea3 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -140,18 +140,24 @@ func (it *batchChunkIterator) nextBatch() (genericIterator, error) { batch := make([]*LazyChunk, 0, it.batchSize+len(it.lastOverlapping)) var nextChunk *LazyChunk + var includesOverlap bool + for it.chunks.Len() > 0 { + // reset nextChunk on each loop to prevent it from pointing to previous chunks + nextChunk = nil // pop the next batch of chunks and append/prepend previous overlapping chunks // so we can merge/de-dupe overlapping entries. - if it.direction == logproto.FORWARD { + if !includesOverlap && it.direction == logproto.FORWARD { batch = append(batch, it.lastOverlapping...) } batch = append(batch, it.chunks.pop(it.batchSize)...) - if it.direction == logproto.BACKWARD { + if !includesOverlap && it.direction == logproto.BACKWARD { batch = append(batch, it.lastOverlapping...) } + includesOverlap = true + if it.chunks.Len() > 0 { nextChunk = it.chunks.Peek() // we max out our iterator boundaries to the next chunks in the queue @@ -294,6 +300,7 @@ func newLogBatchIterator( filter: filter, ctx: ctx, } + batch := newBatchChunkIterator(ctx, chunks, batchSize, direction, start, end, logbatch.newChunksIterator) logbatch.batchChunkIterator = batch return logbatch, nil @@ -321,10 +328,12 @@ func (it *logBatchIterator) newChunksIterator(chunks []*LazyChunk, from, through func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) ([]iter.EntryIterator, error) { result := make([]iter.EntryIterator, 0, len(chks)) for _, chunks := range chks { + iterator, err := it.buildHeapIterator(chunks, from, through, nextChunk) if err != nil { return nil, err } + result = append(result, iterator) } diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index 19dbd5d28435e..ac750b52dd67a 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -548,6 +548,108 @@ func Test_newLogBatchChunkIterator(t *testing.T) { logproto.BACKWARD, 2, }, + // This test is rather complex under the hood. + // It should cause three sub batches in the iterator. + // The first batch has no overlap -- it cannot as the first. It has bounds [1,2) + // The second batch has one chunk overlap, but it includes no entries in the overlap. + // It has bounds [2,4). + // The third batch finally consumes the overlap, with bounds [4,max). + // Notably it also ends up testing the code paths for increasing batch sizes past + // the default due to nextChunks with the same start timestamp. + "forward identicals": { + []*LazyChunk{ + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + }, + }), + }, + []logproto.Stream{ + { + Labels: fooLabels, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + }, + }, + }, + fooLabelsWithName, + from, from.Add(4 * time.Millisecond), + logproto.FORWARD, + 1, + }, } for name, tt := range tests { From 68ef5b99d05e893fa1424d385a2a98ffc2102fe3 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 3 Aug 2020 22:14:11 -0400 Subject: [PATCH 5/9] fixes bad metric name in test --- pkg/storage/util_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 9e2b267502773..8c27f092decd0 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -23,7 +23,7 @@ import ( "github.com/grafana/loki/pkg/util" ) -var fooLabelsWithName = "{foo=\"bar\", __name__=\"log\"}" +var fooLabelsWithName = "{foo=\"bar\", __name__=\"logs\"}" var fooLabels = "{foo=\"bar\"}" var from = time.Unix(0, time.Millisecond.Nanoseconds()) From 40988f3b87e32e287f6cce35aa4cbaf2df4b50ac Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 4 Aug 2020 10:02:54 -0400 Subject: [PATCH 6/9] due to later chunks len check, resetting nextChunk is unnecessary --- pkg/storage/batch.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index 5cf2016296ea3..a072136fc2174 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -143,8 +143,6 @@ func (it *batchChunkIterator) nextBatch() (genericIterator, error) { var includesOverlap bool for it.chunks.Len() > 0 { - // reset nextChunk on each loop to prevent it from pointing to previous chunks - nextChunk = nil // pop the next batch of chunks and append/prepend previous overlapping chunks // so we can merge/de-dupe overlapping entries. From 2b8ab0afeb00335bf8601d48b5533e18b9816739 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 4 Aug 2020 19:19:03 -0400 Subject: [PATCH 7/9] lazychunks pop test --- pkg/storage/lazy_chunk_test.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/pkg/storage/lazy_chunk_test.go b/pkg/storage/lazy_chunk_test.go index 226cee28dda30..9a1aabc3eb1d2 100644 --- a/pkg/storage/lazy_chunk_test.go +++ b/pkg/storage/lazy_chunk_test.go @@ -54,6 +54,36 @@ func TestLazyChunkIterator(t *testing.T) { } } +func TestLazyChunksPop(t *testing.T) { + for i, tc := range []struct { + initial int + n int + expectedLn int + rem int + }{ + {1, 1, 1, 0}, + {2, 1, 1, 1}, + {3, 4, 3, 0}, + } { + + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + lc := &lazyChunks{} + for i := 0; i < tc.initial; i++ { + lc.chunks = append(lc.chunks, &LazyChunk{}) + } + out := lc.pop(tc.n) + + for i := 0; i < tc.expectedLn; i++ { + require.NotNil(t, out[i]) + } + + for i := 0; i < tc.rem; i++ { + require.NotNil(t, lc.chunks[i]) + } + }) + } +} + func TestIsOverlapping(t *testing.T) { tests := []struct { name string From f862fa05301f04c8fe4c41564b6a75b6e0355f69 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 4 Aug 2020 20:37:44 -0400 Subject: [PATCH 8/9] safe starting of batchChunkIterator --- pkg/storage/batch.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index a072136fc2174..9c8462ecf135f 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -43,6 +43,8 @@ type batchChunkIterator struct { lastOverlapping []*LazyChunk iterFactory chunksIteratorFactory + begun bool + ctx context.Context cancel context.CancelFunc start, end time.Time direction logproto.Direction @@ -69,6 +71,7 @@ func newBatchChunkIterator( start: start, end: end, direction: direction, + ctx: ctx, cancel: cancel, iterFactory: iterFactory, chunks: lazyChunks{direction: direction, chunks: chunks}, @@ -78,10 +81,17 @@ func newBatchChunkIterator( }), } sort.Sort(res.chunks) - go res.loop(ctx) return res } +// Start is idempotent and will begin the processing thread which seeds the iterator data. +func (it *batchChunkIterator) Start() { + if !it.begun { + it.begun = true + go it.loop(it.ctx) + } +} + func (it *batchChunkIterator) loop(ctx context.Context) { for { if it.chunks.Len() == 0 { @@ -111,6 +121,8 @@ func (it *batchChunkIterator) loop(ctx context.Context) { } func (it *batchChunkIterator) Next() bool { + it.Start() // Ensure the iterator has started. + var err error // for loop to avoid recursion for { @@ -300,7 +312,10 @@ func newLogBatchIterator( } batch := newBatchChunkIterator(ctx, chunks, batchSize, direction, start, end, logbatch.newChunksIterator) + // Important: since the batchChunkIterator is bound to the LogBatchIterator, + // ensure embedded fields are present before it's started. logbatch.batchChunkIterator = batch + batch.Start() return logbatch, nil } @@ -398,7 +413,11 @@ func newSampleBatchIterator( ctx: ctx, } batch := newBatchChunkIterator(ctx, chunks, batchSize, logproto.FORWARD, start, end, samplebatch.newChunksIterator) + + // Important: since the batchChunkIterator is bound to the SampleBatchIterator, + // ensure embedded fields are present before it's started. samplebatch.batchChunkIterator = batch + batch.Start() return samplebatch, nil } From 891a98f9e3db76320370ae745b7b7bbcbbce7470 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 5 Aug 2020 09:50:48 -0400 Subject: [PATCH 9/9] batchiter rudimentary safe start test --- pkg/storage/batch_test.go | 42 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index ac750b52dd67a..f7ae77417f72b 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -22,6 +22,48 @@ import ( "github.com/grafana/loki/pkg/logql/stats" ) +func Test_batchIterSafeStart(t *testing.T) { + stream := logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + }, + } + chks := []*LazyChunk{ + newLazyChunk(stream), + } + + var ok bool + + batch := newBatchChunkIterator(context.Background(), chks, 1, logproto.FORWARD, from, from.Add(4*time.Millisecond), func(chunks []*LazyChunk, from, through time.Time, nextChunk *LazyChunk) (genericIterator, error) { + if !ok { + panic("unexpected") + } + + // we don't care about the actual data for this test, just give it an iterator. + return iter.NewStreamIterator(stream), nil + }) + + // if it was started already, we should see a panic before this + time.Sleep(time.Millisecond) + ok = true + + // ensure idempotency + batch.Start() + batch.Start() + + ok = batch.Next() + require.Equal(t, true, ok) + +} + func Test_newLogBatchChunkIterator(t *testing.T) { tests := map[string]struct {