From d5fc5054086bda034d061e5e528dd89c5dea7741 Mon Sep 17 00:00:00 2001 From: Fiona Liao Date: Thu, 21 Dec 2023 15:15:29 +0000 Subject: [PATCH] Add test for counter resets when compacting ooo native histograms (#574) * Add test for ooo histogram compaction Signed-off-by: Fiona Liao --- tsdb/db_test.go | 371 ++++++++++++++++++++++++++++++++++++++++++++++ tsdb/head_test.go | 2 +- tsdb/testutil.go | 18 +++ 3 files changed, 390 insertions(+), 1 deletion(-) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 4dfcdc0184..3d293ecb89 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -5977,6 +5977,377 @@ func testWBLAndMmapReplay(t *testing.T, scenario sampleTypeScenario) { }) } +func TestOOOHistogramCompactionWithCounterResets(t *testing.T) { + for _, floatHistogram := range []bool{false, true} { + dir := t.TempDir() + ctx := context.Background() + + opts := DefaultOptions() + opts.OutOfOrderCapMax = 30 + opts.OutOfOrderTimeWindow = 500 * time.Minute.Milliseconds() + + db, err := Open(dir, nil, nil, opts, nil) + require.NoError(t, err) + db.DisableCompactions() // We want to manually call it. + db.EnableNativeHistograms() + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + series1 := labels.FromStrings("foo", "bar1") + series2 := labels.FromStrings("foo", "bar2") + + var series1ExpSamplesPreCompact, series2ExpSamplesPreCompact, series1ExpSamplesPostCompact, series2ExpSamplesPostCompact []chunks.Sample + + addSample := func(ts int64, l labels.Labels, val int, hint histogram.CounterResetHint) sample { + app := db.Appender(context.Background()) + tsMs := ts * time.Minute.Milliseconds() + if floatHistogram { + h := tsdbutil.GenerateTestFloatHistogram(val) + h.CounterResetHint = hint + _, err = app.AppendHistogram(0, l, tsMs, nil, h) + require.NoError(t, err) + require.NoError(t, app.Commit()) + return sample{t: tsMs, fh: h.Copy()} + } + + h := tsdbutil.GenerateTestHistogram(val) + h.CounterResetHint = hint + _, err = app.AppendHistogram(0, l, tsMs, h, nil) + require.NoError(t, err) + require.NoError(t, app.Commit()) + return sample{t: tsMs, h: h.Copy()} + } + + // Add an in-order sample to each series. + s := addSample(520, series1, 1000000, histogram.UnknownCounterReset) + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + + s = addSample(520, series2, 1000000, histogram.UnknownCounterReset) + series2ExpSamplesPreCompact = append(series2ExpSamplesPreCompact, s) + series2ExpSamplesPostCompact = append(series2ExpSamplesPostCompact, s) + + // Verify that the in-memory ooo chunk is empty. + checkEmptyOOOChunk := func(lbls labels.Labels) { + ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) + require.NoError(t, err) + require.False(t, created) + require.Nil(t, ms.ooo) + } + + checkEmptyOOOChunk(series1) + checkEmptyOOOChunk(series2) + + // Add samples for series1. There are three head chunks that will be created: + // Chunk 1 - Samples between 100 - 440. One explicit counter reset at ts 250. + // Chunk 2 - Samples between 105 - 395. Overlaps with Chunk 1. One detected counter reset at ts 165. + // Chunk 3 - Samples between 480 - 509. All within one block boundary. One detected counter reset at 490. + + // Chunk 1. + // First add 10 samples. + for i := 100; i < 200; i += 10 { + s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset) + // Before compaction, all the samples have UnknownCounterReset even though they've been added to the same + // chunk. This is because they overlap with the samples from chunk two and when merging two chunks on read, + // the header is set as unknown when the next sample is not in the same chunk as the previous one. + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + // After compaction, samples from multiple mmapped chunks will be merged, so there won't be any overlapping + // chunks. Therefore, most samples will have the NotCounterReset header. + // 100 is the first sample in the first chunk in the blocks, so is still set to UnknownCounterReset. + // 120 is a block boundary - after compaction, 120 will be the first sample in a chunk, so is still set to + // UnknownCounterReset. + if i > 100 && i != 120 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + } + // Explicit counter reset - the counter reset header is set to CounterReset but the value is higher + // than for the previous timestamp. Explicit counter reset headers are actually ignored though, so when reading + // the sample back you actually get unknown/not counter reset. This is as the chainSampleIterator ignores + // existing headers and sets the header as UnknownCounterReset if the next sample is not in the same chunk as + // the previous one, and counter resets always create a new chunk. + // This case has been added to document what's happening, though it might not be the ideal behavior. + s = addSample(250, series1, 100000+250, histogram.CounterReset) + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, copyWithCounterReset(s, histogram.UnknownCounterReset)) + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, copyWithCounterReset(s, histogram.NotCounterReset)) + + // Add 19 more samples to complete a chunk. + for i := 260; i < 450; i += 10 { + s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset) + // The samples with timestamp less than 410 overlap with the samples from chunk 2, so before compaction, + // they're all UnknownCounterReset. Samples greater than or equal to 410 don't overlap with other chunks + // so they're always detected as NotCounterReset pre and post compaction/ + if i >= 410 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + // + // 360 is a block boundary, so after compaction its header is still UnknownCounterReset. + if i != 360 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + } + + // Chunk 2. + // Add six OOO samples. + for i := 105; i < 165; i += 10 { + s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset) + // Samples overlap with chunk 1 so before compaction all headers are UnknownCounterReset. + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, copyWithCounterReset(s, histogram.NotCounterReset)) + } + + // Add sample that will be detected as a counter reset. + s = addSample(165, series1, 100000, histogram.UnknownCounterReset) + // Before compaction, sample has an UnknownCounterReset header due to the chainSampleIterator. + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + // After compaction, the sample's counter reset is properly detected. + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, copyWithCounterReset(s, histogram.CounterReset)) + + // Add 23 more samples to complete a chunk. + for i := 175; i < 405; i += 10 { + s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset) + // Samples between 205-255 overlap with chunk 1 so before compaction those samples will have the + // UnknownCounterReset header. + if i >= 205 && i < 255 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + // 245 is the first sample >= the block boundary at 240, so it's still UnknownCounterReset after compaction. + if i != 245 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } else { + s = copyWithCounterReset(s, histogram.UnknownCounterReset) + } + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + } + + // Chunk 3. + for i := 480; i < 490; i++ { + s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset) + // No overlapping samples in other chunks, so all other samples will already be detected as NotCounterReset + // before compaction. + if i > 480 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + // 480 is block boundary. + if i == 480 { + s = copyWithCounterReset(s, histogram.UnknownCounterReset) + } + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + } + // Counter reset. + s = addSample(int64(490), series1, 100000, histogram.UnknownCounterReset) + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + // Add some more samples after the counter reset. + for i := 491; i < 510; i++ { + s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset) + s = copyWithCounterReset(s, histogram.NotCounterReset) + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + } + + // Add samples for series2 - one chunk with one detected counter reset at 300. + for i := 200; i < 300; i += 10 { + s = addSample(int64(i), series2, 100000+i, histogram.UnknownCounterReset) + if i > 200 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } + series2ExpSamplesPreCompact = append(series2ExpSamplesPreCompact, s) + if i == 240 { + s = copyWithCounterReset(s, histogram.UnknownCounterReset) + } + series2ExpSamplesPostCompact = append(series2ExpSamplesPostCompact, s) + } + // Counter reset. + s = addSample(int64(300), series2, 100000, histogram.UnknownCounterReset) + series2ExpSamplesPreCompact = append(series2ExpSamplesPreCompact, s) + series2ExpSamplesPostCompact = append(series2ExpSamplesPostCompact, s) + // Add some more samples after the counter reset. + for i := 310; i < 500; i += 10 { + s := addSample(int64(i), series2, 100000+i, histogram.UnknownCounterReset) + s = copyWithCounterReset(s, histogram.NotCounterReset) + series2ExpSamplesPreCompact = append(series2ExpSamplesPreCompact, s) + // 360 and 480 are block boundaries. + if i == 360 || i == 480 { + s = copyWithCounterReset(s, histogram.UnknownCounterReset) + } + series2ExpSamplesPostCompact = append(series2ExpSamplesPostCompact, s) + } + + // Sort samples (as OOO samples not added in time-order). + sort.Slice(series1ExpSamplesPreCompact, func(i, j int) bool { + return series1ExpSamplesPreCompact[i].T() < series1ExpSamplesPreCompact[j].T() + }) + sort.Slice(series1ExpSamplesPostCompact, func(i, j int) bool { + return series1ExpSamplesPostCompact[i].T() < series1ExpSamplesPostCompact[j].T() + }) + sort.Slice(series2ExpSamplesPreCompact, func(i, j int) bool { + return series2ExpSamplesPreCompact[i].T() < series2ExpSamplesPreCompact[j].T() + }) + sort.Slice(series2ExpSamplesPostCompact, func(i, j int) bool { + return series2ExpSamplesPostCompact[i].T() < series2ExpSamplesPostCompact[j].T() + }) + + verifyDBSamples := func(s1Samples, s2Samples []chunks.Sample) { + expRes := map[string][]chunks.Sample{ + series1.String(): s1Samples, + series2.String(): s2Samples, + } + + q, err := db.Querier(math.MinInt64, math.MaxInt64) + require.NoError(t, err) + actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) + requireEqualSamples(t, expRes, actRes, false) + } + + // Verify DB samples before compaction. + verifyDBSamples(series1ExpSamplesPreCompact, series2ExpSamplesPreCompact) + + // Verify that the in-memory ooo chunk is not empty. + checkNonEmptyOOOChunk := func(lbls labels.Labels) { + ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) + require.NoError(t, err) + require.False(t, created) + require.Greater(t, ms.ooo.oooHeadChunk.chunk.NumSamples(), 0) + } + + checkNonEmptyOOOChunk(series1) + checkNonEmptyOOOChunk(series2) + + // No blocks before compaction. + require.Empty(t, db.Blocks()) + + // There is a 0th WBL file. + require.NoError(t, db.head.wbl.Sync()) // syncing to make sure wbl is flushed in windows + files, err := os.ReadDir(db.head.wbl.Dir()) + require.NoError(t, err) + require.Len(t, files, 1) + require.Equal(t, "00000000", files[0].Name()) + f, err := files[0].Info() + require.NoError(t, err) + require.Greater(t, f.Size(), int64(100)) + + // OOO compaction happens here. + require.NoError(t, db.CompactOOOHead(ctx)) + + // Check that blocks are created after compaction. + require.Len(t, db.Blocks(), 5) + + // Check samples after compaction + verifyDBSamples(series1ExpSamplesPostCompact, series2ExpSamplesPostCompact) + + // 0th WBL file will be deleted and 1st will be the only present. + files, err = os.ReadDir(db.head.wbl.Dir()) + require.NoError(t, err) + require.Len(t, files, 1) + require.Equal(t, "00000001", files[0].Name()) + f, err = files[0].Info() + require.NoError(t, err) + require.Equal(t, int64(0), f.Size()) + + // OOO stuff should not be present in the Head now. + checkEmptyOOOChunk(series1) + checkEmptyOOOChunk(series2) + + verifyBlockSamples := func(block *Block, fromMins, toMins int64) { + var series1Samples, series2Samples []chunks.Sample + + for _, s := range series1ExpSamplesPostCompact { + if s.T() >= fromMins*time.Minute.Milliseconds() { + // samples should be sorted, so break out of loop when we reach a timestamp that's too big + if s.T() > toMins*time.Minute.Milliseconds() { + break + } + series1Samples = append(series1Samples, s) + } + } + for _, s := range series2ExpSamplesPostCompact { + if s.T() >= fromMins*time.Minute.Milliseconds() { + // Samples should be sorted, so break out of loop when we reach a timestamp that's too big. + if s.T() > toMins*time.Minute.Milliseconds() { + break + } + series2Samples = append(series2Samples, s) + } + } + + expRes := map[string][]chunks.Sample{} + if len(series1Samples) != 0 { + expRes[series1.String()] = series1Samples + } + if len(series2Samples) != 0 { + expRes[series2.String()] = series2Samples + } + + q, err := NewBlockQuerier(block, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + + actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) + requireEqualSamples(t, expRes, actRes, false) + } + + // Checking for expected data in the blocks. + verifyBlockSamples(db.Blocks()[0], 100, 119) + verifyBlockSamples(db.Blocks()[1], 120, 239) + verifyBlockSamples(db.Blocks()[2], 240, 359) + verifyBlockSamples(db.Blocks()[3], 360, 479) + verifyBlockSamples(db.Blocks()[4], 480, 509) + + // There should be a single m-map file. + mmapDir := mmappedChunksDir(db.head.opts.ChunkDirRoot) + files, err = os.ReadDir(mmapDir) + require.NoError(t, err) + require.Len(t, files, 1) + + // Compact the in-order head and expect another block. + // Since this is a forced compaction, this block is not aligned with 2h. + err = db.CompactHead(NewRangeHead(db.head, 500*time.Minute.Milliseconds(), 550*time.Minute.Milliseconds())) + require.NoError(t, err) + require.Len(t, db.Blocks(), 6) + verifyBlockSamples(db.Blocks()[5], 520, 520) + + // Blocks created out of normal and OOO head now. But not merged. + verifyDBSamples(series1ExpSamplesPostCompact, series2ExpSamplesPostCompact) + + // The compaction also clears out the old m-map files. Including + // the file that has ooo chunks. + files, err = os.ReadDir(mmapDir) + require.NoError(t, err) + require.Len(t, files, 1) + require.Equal(t, "000001", files[0].Name()) + + // This will merge overlapping block. + require.NoError(t, db.Compact(ctx)) + + require.Len(t, db.Blocks(), 5) + verifyBlockSamples(db.Blocks()[0], 100, 119) + verifyBlockSamples(db.Blocks()[1], 120, 239) + verifyBlockSamples(db.Blocks()[2], 240, 359) + verifyBlockSamples(db.Blocks()[3], 360, 479) + verifyBlockSamples(db.Blocks()[4], 480, 520) // Merged block. + + // Final state. Blocks from normal and OOO head are merged. + verifyDBSamples(series1ExpSamplesPostCompact, series2ExpSamplesPostCompact) + } +} + +func copyWithCounterReset(s sample, hint histogram.CounterResetHint) sample { + if s.h != nil { + h := s.h.Copy() + h.CounterResetHint = hint + return sample{t: s.t, h: h} + } else { + h := s.fh.Copy() + h.CounterResetHint = hint + return sample{t: s.t, fh: h} + } +} + func TestOOOCompactionFailure(t *testing.T) { for name, scenario := range sampleTypeScenarios { t.Run(name, func(t *testing.T) { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 48537024fd..7fc4c2080e 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -4368,7 +4368,7 @@ func TestOOOHistogramCounterResetHeaders(t *testing.T) { app := head.Appender(context.Background()) var err error if floatHisto { - _, err = app.AppendHistogram(0, l, ts, nil, h.ToFloat()) + _, err = app.AppendHistogram(0, l, ts, nil, h.ToFloat(nil)) } else { _, err = app.AppendHistogram(0, l, ts, h.Copy(), nil) } diff --git a/tsdb/testutil.go b/tsdb/testutil.go index 7299f8a04b..928d155d9c 100644 --- a/tsdb/testutil.go +++ b/tsdb/testutil.go @@ -110,6 +110,8 @@ func compareSamples(t *testing.T, name string, expected, actual []chunks.Sample, if ignoreCounterResets && expectedHist.CounterResetHint != histogram.GaugeType { expectedHist.CounterResetHint = histogram.UnknownCounterReset actualHist.CounterResetHint = histogram.UnknownCounterReset + } else { + require.Equal(t, expectedHist.CounterResetHint, actualHist.CounterResetHint, "Sample header doesn't match for %s[%d] at ts %d, expected: %s, actual: %s", name, i, expectedSample.T(), counterResetAsString(expectedHist.CounterResetHint), counterResetAsString(actualHist.CounterResetHint)) } require.Equal(t, expectedHist, actualHist, "Sample doesn't match for %s[%d] at ts %d", name, i, expectedSample.T()) } @@ -120,6 +122,8 @@ func compareSamples(t *testing.T, name string, expected, actual []chunks.Sample, if ignoreCounterResets { expectedHist.CounterResetHint = histogram.UnknownCounterReset actualHist.CounterResetHint = histogram.UnknownCounterReset + } else { + require.Equal(t, expectedHist.CounterResetHint, actualHist.CounterResetHint, "Sample header doesn't match for %s[%d] at ts %d, expected: %s, actual: %s", name, i, expectedSample.T(), counterResetAsString(expectedHist.CounterResetHint), counterResetAsString(actualHist.CounterResetHint)) } require.Equal(t, expectedHist, actualHist, "Sample doesn't match for %s[%d] at ts %d", name, i, expectedSample.T()) } @@ -128,3 +132,17 @@ func compareSamples(t *testing.T, name string, expected, actual []chunks.Sample, } } } + +func counterResetAsString(h histogram.CounterResetHint) string { + switch h { + case histogram.UnknownCounterReset: + return "UnknownCounterReset" + case histogram.CounterReset: + return "CounterReset" + case histogram.NotCounterReset: + return "NotCounterReset" + case histogram.GaugeType: + return "GaugeType" + } + panic("Unexpected counter reset type") +}