Skip to content

Commit

Permalink
Add test for ooo histogram compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
fionaliao committed Dec 12, 2023
1 parent a560cb3 commit 395b742
Show file tree
Hide file tree
Showing 2 changed files with 389 additions and 0 deletions.
371 changes: 371 additions & 0 deletions tsdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5980,6 +5980,377 @@ func testWBLAndMmapReplay(t *testing.T, scenario sampleTypeScenario) {
})
}

func TestOOOHistogramCompactionWithCounterResets(t *testing.T) {
for _, floatHistogram := range []bool{false, true} {
dir := t.TempDir()
ctx := context.Background()

opts := DefaultOptions()
opts.OutOfOrderCapMax = 30
opts.OutOfOrderTimeWindow = 500 * time.Minute.Milliseconds()

db, err := Open(dir, nil, nil, opts, nil)
require.NoError(t, err)
db.DisableCompactions() // We want to manually call it.
db.EnableNativeHistograms()
t.Cleanup(func() {
require.NoError(t, db.Close())
})

series1 := labels.FromStrings("foo", "bar1")
series2 := labels.FromStrings("foo", "bar2")

var series1ExpSamplesPreCompact, series2ExpSamplesPreCompact, series1ExpSamplesPostCompact, series2ExpSamplesPostCompact []chunks.Sample

addSample := func(ts int64, l labels.Labels, val int, hint histogram.CounterResetHint) sample {
app := db.Appender(context.Background())
tsMs := ts * time.Minute.Milliseconds()
if floatHistogram {
h := tsdbutil.GenerateTestFloatHistogram(val)
h.CounterResetHint = hint
_, err = app.AppendHistogram(0, l, tsMs, nil, h)
require.NoError(t, err)
require.NoError(t, app.Commit())
return sample{t: tsMs, fh: h.Copy()}
} else {
h := tsdbutil.GenerateTestHistogram(val)
h.CounterResetHint = hint
_, err = app.AppendHistogram(0, l, tsMs, h, nil)
require.NoError(t, err)
require.NoError(t, app.Commit())
return sample{t: tsMs, h: h.Copy()}
}
}

// Add an in-order sample to each series.
s := addSample(520, series1, 1000000, histogram.UnknownCounterReset)
series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s)
series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s)

s = addSample(520, series2, 1000000, histogram.UnknownCounterReset)
series2ExpSamplesPreCompact = append(series2ExpSamplesPreCompact, s)
series2ExpSamplesPostCompact = append(series2ExpSamplesPostCompact, s)

// Verify that the in-memory ooo chunk is empty.
checkEmptyOOOChunk := func(lbls labels.Labels) {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
require.NoError(t, err)
require.False(t, created)
require.Nil(t, ms.ooo)
}

checkEmptyOOOChunk(series1)
checkEmptyOOOChunk(series2)

// Add samples for series1. There are three head chunks that will be created:
// Chunk 1 - Samples between 100 - 440. One explicit counter reset at ts 250.
// Chunk 2 - Samples between 105 - 395. Overlaps with Chunk 1. One detected counter reset at ts 165.
// Chunk 3 - Samples between 480 - 509. All within one block boundary. One detected counter reset at 490.

// Chunk 1.
// First add 10 samples.
for i := 100; i < 200; i += 10 {
s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset)
// Before compaction, all the samples have UnknownCounterReset even though they've been added to the same
// chunk. This is because they overlap with the samples from chunk two and when merging two chunks on read,
// the header is set as unknown when the next sample is not in the same chunk as the previous one.
series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s)
// After compaction, samples from multiple mmapped chunks will be merged, so there won't be any overlapping
// chunks. Therefore, most samples will have the NotCounterReset header.
// 100 is the first sample in the first chunk in the blocks, so is still set to UnknownCounterReset.
// 120 is a block boundary - after compaction, 120 will be the first sample in a chunk, so is still set to
// UnknownCounterReset.
if i > 100 && i != 120 {
s = copyWithCounterReset(s, histogram.NotCounterReset)
}
series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s)
}
// Explicit counter reset - the counter reset header is set to CounterReset but the value is higher
// than for the previous timestamp. Explicit counter reset headers are actually ignored though, so when reading
// the sample back you actually get unknown/not counter reset. This is as the chainSampleIterator ignores
// existing headers and sets the header as UnknownCounterReset if the next sample is not in the same chunk as
// the previous one, and counter resets always create a new chunk.
// This case has been added to document what's happening, though it might not be the ideal behavior.
s = addSample(250, series1, 100000+250, histogram.CounterReset)
series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, copyWithCounterReset(s, histogram.UnknownCounterReset))
series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, copyWithCounterReset(s, histogram.NotCounterReset))

// Add 19 more samples to complete a chunk.
for i := 260; i < 450; i += 10 {
s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset)
// The samples with timestamp less than 410 overlap with the samples from chunk 2, so before compaction,
// they're all UnknownCounterReset. Samples greater than or equal to 410 don't overlap with other chunks
// so they're always detected as NotCounterReset pre and post compaction/
if i >= 410 {
s = copyWithCounterReset(s, histogram.NotCounterReset)
}
series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s)
//
// 360 is a block boundary, so after compaction its header is still UnknownCounterReset.
if i != 360 {
s = copyWithCounterReset(s, histogram.NotCounterReset)
}
series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s)
}

// Chunk 2.
// Add six OOO samples.
for i := 105; i < 165; i += 10 {
s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset)
// Samples overlap with chunk 1 so before compaction all headers are UnknownCounterReset.
series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s)
series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, copyWithCounterReset(s, histogram.NotCounterReset))
}

// Add sample that will be detected as a counter reset.
s = addSample(165, series1, 100000, histogram.UnknownCounterReset)
// Before compaction, sample has an UnknownCounterReset header due to the chainSampleIterator.
series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s)
// After compaction, the sample's counter reset is properly detected.
series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, copyWithCounterReset(s, histogram.CounterReset))

// Add 23 more samples to complete a chunk.
for i := 175; i < 405; i += 10 {
s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset)
// Samples between 205-255 overlap with chunk 1 so before compaction those samples will have the
// UnknownCounterReset header.
if i >= 205 && i < 255 {
s = copyWithCounterReset(s, histogram.NotCounterReset)
}
series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s)
// 245 is the first sample >= the block boundary at 240, so it's still UnknownCounterReset after compaction.
if i != 245 {
s = copyWithCounterReset(s, histogram.NotCounterReset)
} else {
s = copyWithCounterReset(s, histogram.UnknownCounterReset)
}
series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s)
}

// Chunk 3.
for i := 480; i < 490; i += 1 {
s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset)
// No overlapping samples in other chunks, so all other samples will already be detected as NotCounterReset
// before compaction.
if i > 480 {
s = copyWithCounterReset(s, histogram.NotCounterReset)
}
series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s)
// 480 is block boundary.
if i == 480 {
s = copyWithCounterReset(s, histogram.UnknownCounterReset)
}
series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s)
}
// Counter reset.
s = addSample(int64(490), series1, 100000, histogram.UnknownCounterReset)
series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s)
series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s)
// Add some more samples after the counter reset.
for i := 491; i < 510; i += 1 {
s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset)
s = copyWithCounterReset(s, histogram.NotCounterReset)
series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s)
series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s)
}

// Add samples for series2 - one chunk with one detected counter reset at 300.
for i := 200; i < 300; i += 10 {
s = addSample(int64(i), series2, 100000+i, histogram.UnknownCounterReset)
if i > 200 {
s = copyWithCounterReset(s, histogram.NotCounterReset)
}
series2ExpSamplesPreCompact = append(series2ExpSamplesPreCompact, s)
if i == 240 {
s = copyWithCounterReset(s, histogram.UnknownCounterReset)
}
series2ExpSamplesPostCompact = append(series2ExpSamplesPostCompact, s)
}
// Counter reset.
s = addSample(int64(300), series2, 100000, histogram.UnknownCounterReset)
series2ExpSamplesPreCompact = append(series2ExpSamplesPreCompact, s)
series2ExpSamplesPostCompact = append(series2ExpSamplesPostCompact, s)
// Add some more samples after the counter reset.
for i := 310; i < 500; i += 10 {
s := addSample(int64(i), series2, 100000+i, histogram.UnknownCounterReset)
s = copyWithCounterReset(s, histogram.NotCounterReset)
series2ExpSamplesPreCompact = append(series2ExpSamplesPreCompact, s)
// 360 and 480 are block boundaries.
if i == 360 || i == 480 {
s = copyWithCounterReset(s, histogram.UnknownCounterReset)
}
series2ExpSamplesPostCompact = append(series2ExpSamplesPostCompact, s)
}

// Sort samples (as OOO samples not added in time-order).
sort.Slice(series1ExpSamplesPreCompact, func(i, j int) bool {
return series1ExpSamplesPreCompact[i].T() < series1ExpSamplesPreCompact[j].T()
})
sort.Slice(series1ExpSamplesPostCompact, func(i, j int) bool {
return series1ExpSamplesPostCompact[i].T() < series1ExpSamplesPostCompact[j].T()
})
sort.Slice(series2ExpSamplesPreCompact, func(i, j int) bool {
return series2ExpSamplesPreCompact[i].T() < series2ExpSamplesPreCompact[j].T()
})
sort.Slice(series2ExpSamplesPostCompact, func(i, j int) bool {
return series2ExpSamplesPostCompact[i].T() < series2ExpSamplesPostCompact[j].T()
})

verifyDBSamples := func(s1Samples []chunks.Sample, s2Samples []chunks.Sample) {

Check failure on line 6199 in tsdb/db_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)

Check failure on line 6199 in tsdb/db_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)
expRes := map[string][]chunks.Sample{
series1.String(): s1Samples,
series2.String(): s2Samples,
}

q, err := db.Querier(math.MinInt64, math.MaxInt64)
require.NoError(t, err)
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
requireEqualSamples(t, expRes, actRes, false)
}

// Verify DB samples before compaction.
verifyDBSamples(series1ExpSamplesPreCompact, series2ExpSamplesPreCompact)

// Verify that the in-memory ooo chunk is not empty.
checkNonEmptyOOOChunk := func(lbls labels.Labels) {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
require.NoError(t, err)
require.False(t, created)
require.Greater(t, ms.ooo.oooHeadChunk.chunk.NumSamples(), 0)
}

checkNonEmptyOOOChunk(series1)
checkNonEmptyOOOChunk(series2)

// No blocks before compaction.
require.Equal(t, len(db.Blocks()), 0)

// There is a 0th WBL file.
require.NoError(t, db.head.wbl.Sync()) // syncing to make sure wbl is flushed in windows
files, err := os.ReadDir(db.head.wbl.Dir())
require.NoError(t, err)
require.Len(t, files, 1)
require.Equal(t, "00000000", files[0].Name())
f, err := files[0].Info()
require.NoError(t, err)
require.Greater(t, f.Size(), int64(100))

// OOO compaction happens here.
require.NoError(t, db.CompactOOOHead(ctx))

// Check that blocks are created after compaction.
require.Equal(t, 5, len(db.Blocks()))

// Check samples after compaction
verifyDBSamples(series1ExpSamplesPostCompact, series2ExpSamplesPostCompact)

// 0th WBL file will be deleted and 1st will be the only present.
files, err = os.ReadDir(db.head.wbl.Dir())
require.NoError(t, err)
require.Len(t, files, 1)
require.Equal(t, "00000001", files[0].Name())
f, err = files[0].Info()
require.NoError(t, err)
require.Equal(t, int64(0), f.Size())

// OOO stuff should not be present in the Head now.
checkEmptyOOOChunk(series1)
checkEmptyOOOChunk(series2)

verifyBlockSamples := func(block *Block, fromMins, toMins int64) {
var series1Samples, series2Samples []chunks.Sample

for _, s := range series1ExpSamplesPostCompact {
if s.T() >= fromMins*time.Minute.Milliseconds() {
// samples should be sorted, so break out of loop when we reach a timestamp that's too big
if s.T() > toMins*time.Minute.Milliseconds() {
break
}
series1Samples = append(series1Samples, s)
}
}
for _, s := range series2ExpSamplesPostCompact {
if s.T() >= fromMins*time.Minute.Milliseconds() {
// Samples should be sorted, so break out of loop when we reach a timestamp that's too big.
if s.T() > toMins*time.Minute.Milliseconds() {
break
}
series2Samples = append(series2Samples, s)
}
}

expRes := map[string][]chunks.Sample{}
if len(series1Samples) != 0 {
expRes[series1.String()] = series1Samples
}
if len(series2Samples) != 0 {
expRes[series2.String()] = series2Samples
}

q, err := NewBlockQuerier(block, math.MinInt64, math.MaxInt64)
require.NoError(t, err)

actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
requireEqualSamples(t, expRes, actRes, false)
}

// Checking for expected data in the blocks.
verifyBlockSamples(db.Blocks()[0], 100, 119)
verifyBlockSamples(db.Blocks()[1], 120, 239)
verifyBlockSamples(db.Blocks()[2], 240, 359)
verifyBlockSamples(db.Blocks()[3], 360, 479)
verifyBlockSamples(db.Blocks()[4], 480, 509)

// There should be a single m-map file.
mmapDir := mmappedChunksDir(db.head.opts.ChunkDirRoot)
files, err = os.ReadDir(mmapDir)
require.NoError(t, err)
require.Len(t, files, 1)

// Compact the in-order head and expect another block.
// Since this is a forced compaction, this block is not aligned with 2h.
err = db.CompactHead(NewRangeHead(db.head, 500*time.Minute.Milliseconds(), 550*time.Minute.Milliseconds()))
require.NoError(t, err)
require.Equal(t, len(db.Blocks()), 6)
verifyBlockSamples(db.Blocks()[5], 520, 520)

// Blocks created out of normal and OOO head now. But not merged.
verifyDBSamples(series1ExpSamplesPostCompact, series2ExpSamplesPostCompact)

// The compaction also clears out the old m-map files. Including
// the file that has ooo chunks.
files, err = os.ReadDir(mmapDir)
require.NoError(t, err)
require.Len(t, files, 1)
require.Equal(t, "000001", files[0].Name())

// This will merge overlapping block.
require.NoError(t, db.Compact(ctx))

require.Equal(t, len(db.Blocks()), 5)
verifyBlockSamples(db.Blocks()[0], 100, 119)
verifyBlockSamples(db.Blocks()[1], 120, 239)
verifyBlockSamples(db.Blocks()[2], 240, 359)
verifyBlockSamples(db.Blocks()[3], 360, 479)
verifyBlockSamples(db.Blocks()[4], 480, 520) //merged block

Check failure on line 6335 in tsdb/db_test.go

View workflow job for this annotation

GitHub Actions / lint

commentFormatting: put a space between `//` and comment text (gocritic)

Check failure on line 6335 in tsdb/db_test.go

View workflow job for this annotation

GitHub Actions / lint

commentFormatting: put a space between `//` and comment text (gocritic)

// Final state. Blocks from normal and OOO head are merged.
verifyDBSamples(series1ExpSamplesPostCompact, series2ExpSamplesPostCompact)
}
}

func copyWithCounterReset(s sample, hint histogram.CounterResetHint) sample {
if s.h != nil {
h := s.h.Copy()
h.CounterResetHint = hint
return sample{t: s.t, h: h}
} else {
h := s.fh.Copy()
h.CounterResetHint = hint
return sample{t: s.t, fh: h}
}
}

func TestOOOCompactionFailure(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
Expand Down
Loading

0 comments on commit 395b742

Please sign in to comment.