From cf744ab6f8832f8a859a5ad1f898d2c62a10eb99 Mon Sep 17 00:00:00 2001 From: Fiona Liao Date: Fri, 29 Dec 2023 18:36:33 +0000 Subject: [PATCH 1/2] Add multiple encoding test --- tsdb/db_test.go | 189 ++++++++++++++++++++++++++++++++++++++++++----- tsdb/testutil.go | 2 +- 2 files changed, 171 insertions(+), 20 deletions(-) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 88f56a9d93..0eaa9b8e75 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -4600,6 +4600,171 @@ func TestMetadataAssertInMemoryData(t *testing.T) { require.Equal(t, *reopenDB.head.series.getByHash(s4.Hash(), s4).meta, m4) } +// TestMultipleEncodingsCommitOrder mainly serves to demonstrate when happens when committing a batch of samples for the +// same series when there are multiple encodings. Commit() will process all float samples before histogram samples. This +// means that if histograms are appended before floats, the histograms could be marked as OOO when they are committed. +// While possible, this shouldn't happen very often - you need the same series to be ingested as both a float and a +// histogram in a single write request. +func TestMultipleEncodingsCommitOrder(t *testing.T) { + opts := DefaultOptions() + opts.OutOfOrderCapMax = 30 + opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds() + opts.AllowOverlappingCompaction = false + + series1 := labels.FromStrings("foo", "bar1") + + db := openTestDB(t, opts, nil) + db.DisableCompactions() + db.EnableNativeHistograms() + db.EnableOOONativeHistograms() + defer func() { + require.NoError(t, db.Close()) + }() + + addSample := func(app storage.Appender, ts int64, valType chunkenc.ValueType) chunks.Sample { + if valType == chunkenc.ValFloat { + _, err := app.Append(0, labels.FromStrings("foo", "bar1"), ts, float64(ts)) + require.NoError(t, err) + return sample{t: ts, f: float64(ts)} + } + if valType == chunkenc.ValHistogram { + h := tsdbutil.GenerateTestHistogram(int(ts)) + _, err := app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, h, nil) + require.NoError(t, err) + return sample{t: ts, h: h} + } + fh := tsdbutil.GenerateTestFloatHistogram(int(ts)) + _, err := app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, nil, fh) + require.NoError(t, err) + return sample{t: ts, fh: fh} + } + + verifySamples := func(minT, maxT int64, expSamples []chunks.Sample, oooCount int) { + require.Equal(t, float64(oooCount), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended), "number of ooo appended samples mismatch") + + // Verify samples querier. + querier, err := db.Querier(minT, maxT) + require.NoError(t, err) + defer querier.Close() + + seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")) + require.Equal(t, 1, len(seriesSet)) + gotSamples := seriesSet[series1.String()] + compareSamples(t, series1.String(), expSamples, gotSamples, true) + + // Verify chunks querier. + chunkQuerier, err := db.ChunkQuerier(minT, maxT) + require.NoError(t, err) + defer chunkQuerier.Close() + + chks := queryChunks(t, chunkQuerier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")) + require.NotNil(t, chks[series1.String()]) + require.Equal(t, 1, len(chks)) + var gotChunkSamples []chunks.Sample + for _, chunk := range chks[series1.String()] { + it := chunk.Chunk.Iterator(nil) + for t := it.Next(); t != chunkenc.ValNone; t = it.Next() { + switch t { + case chunkenc.ValFloat: + t, v := it.At() + gotChunkSamples = append(gotChunkSamples, sample{t: t, f: v}) + case chunkenc.ValHistogram: + t, v := it.AtHistogram() + gotChunkSamples = append(gotChunkSamples, sample{t: t, h: v}) + case chunkenc.ValFloatHistogram: + t, v := it.AtFloatHistogram() + gotChunkSamples = append(gotChunkSamples, sample{t: t, fh: v}) + } + } + require.NoError(t, it.Err()) + } + compareSamples(t, series1.String(), expSamples, gotChunkSamples, true) + } + + var expSamples []chunks.Sample + + // Append samples with different encoding types and then commit them at once. + app := db.Appender(context.Background()) + + for i := 100; i < 105; i++ { + s := addSample(app, int64(i), chunkenc.ValFloat) + expSamples = append(expSamples, s) + } + // These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the + // same batch. + for i := 110; i < 120; i++ { + s := addSample(app, int64(i), chunkenc.ValHistogram) + expSamples = append(expSamples, s) + } + // These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the + // same batch. + for i := 120; i < 130; i++ { + s := addSample(app, int64(i), chunkenc.ValFloatHistogram) + expSamples = append(expSamples, s) + } + // These samples will be marked as in-order as their timestamps are greater than the max timestamp for float + // samples in the same batch. + for i := 140; i < 150; i++ { + s := addSample(app, int64(i), chunkenc.ValFloatHistogram) + expSamples = append(expSamples, s) + } + // These samples will be marked as in-order, even though they're appended after the float histograms from ts 140-150 + // because float samples are processed first and these samples are in-order wrt to the float samples in the batch. + for i := 130; i < 135; i++ { + s := addSample(app, int64(i), chunkenc.ValFloat) + expSamples = append(expSamples, s) + } + + require.NoError(t, app.Commit()) + + sort.Slice(expSamples, func(i, j int) bool { + return expSamples[i].T() < expSamples[j].T() + }) + + // oooCount = 20 because the histograms from 120 - 130 and float histograms from 120 - 130 are detected as OOO. + verifySamples(100, 150, expSamples, 20) + + // Append and commit some in-order histograms by themselves. + app = db.Appender(context.Background()) + for i := 150; i < 160; i++ { + s := addSample(app, int64(i), chunkenc.ValHistogram) + expSamples = append(expSamples, s) + } + require.NoError(t, app.Commit()) + + // oooCount remains at 20 as no new OOO samples have been added. + verifySamples(100, 160, expSamples, 20) + + // Append and commit samples for all encoding types. This time all samples will be treated as OOO because samples + // with newer timestamps have already been committed. + app = db.Appender(context.Background()) + for i := 50; i < 55; i++ { + s := addSample(app, int64(i), chunkenc.ValFloat) + expSamples = append(expSamples, s) + } + for i := 60; i < 70; i++ { + s := addSample(app, int64(i), chunkenc.ValHistogram) + expSamples = append(expSamples, s) + } + for i := 70; i < 75; i++ { + s := addSample(app, int64(i), chunkenc.ValFloat) + expSamples = append(expSamples, s) + } + for i := 80; i < 90; i++ { + s := addSample(app, int64(i), chunkenc.ValFloatHistogram) + expSamples = append(expSamples, s) + } + require.NoError(t, app.Commit()) + + // Sort samples again because OOO samples have been added. + sort.Slice(expSamples, func(i, j int) bool { + return expSamples[i].T() < expSamples[j].T() + }) + + // oooCount = 50 as we've added 30 more OOO samples. + verifySamples(50, 160, expSamples, 50) +} + // TODO(codesome): test more samples incoming once compaction has started. To verify new samples after the start // // are not included in this compaction. @@ -5326,22 +5491,10 @@ func testQuerierOOOQuery(t *testing.T, defer querier.Close() seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")) - require.NotNil(t, seriesSet[series1.String()]) - require.Len(t, seriesSet, 1) - var gotSamples []chunks.Sample - for _, sample := range seriesSet[series1.String()] { - switch sample.Type() { - case chunkenc.ValFloat: - gotSamples = append(gotSamples, sample) - case chunkenc.ValHistogram: - sample.H().CounterResetHint = histogram.UnknownCounterReset - gotSamples = append(gotSamples, sample) - case chunkenc.ValFloatHistogram: - sample.FH().CounterResetHint = histogram.UnknownCounterReset - gotSamples = append(gotSamples, sample) - } - } - require.Equal(t, expSamples, gotSamples) + gotSamples := seriesSet[series1.String()] + require.NotNil(t, gotSamples) + require.Equal(t, 1, len(seriesSet)) + compareSamples(t, series1.String(), expSamples, gotSamples, true) require.GreaterOrEqual(t, float64(oooSamples), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended), "number of ooo appended samples mismatch") }) } @@ -5444,11 +5597,9 @@ func testChunkQuerierOOOQuery(t *testing.T, samples = append(samples, sample{t: t, f: v}) case chunkenc.ValHistogram: t, v := it.AtHistogram() - v.CounterResetHint = histogram.UnknownCounterReset samples = append(samples, sample{t: t, h: v}) case chunkenc.ValFloatHistogram: t, v := it.AtFloatHistogram() - v.CounterResetHint = histogram.UnknownCounterReset samples = append(samples, sample{t: t, fh: v}) } } @@ -5568,7 +5719,7 @@ func testChunkQuerierOOOQuery(t *testing.T, gotSamples = append(gotSamples, extractSamples(it)...) require.NoError(t, it.Err()) } - require.Equal(t, expSamples, gotSamples) + compareSamples(t, series1.String(), expSamples, gotSamples, true) }) } } diff --git a/tsdb/testutil.go b/tsdb/testutil.go index 4aa4ef537d..e8a2ad2fbe 100644 --- a/tsdb/testutil.go +++ b/tsdb/testutil.go @@ -95,7 +95,6 @@ func requireEqualSamples(t *testing.T, expected, actual map[string][]chunks.Samp for name, expectedItem := range expected { actualItem, ok := actual[name] require.True(t, ok, "Expected series %s not found", name) - require.Equal(t, len(expectedItem), len(actualItem), "Length not expected for %s", name) compareSamples(t, name, expectedItem, actualItem, ignoreCounterResets) } for name := range actual { @@ -105,6 +104,7 @@ func requireEqualSamples(t *testing.T, expected, actual map[string][]chunks.Samp } func compareSamples(t *testing.T, name string, expected, actual []chunks.Sample, ignoreCounterResets bool) { + require.Equal(t, len(expected), len(actual), "Length not expected for %s", name) for i, s := range expected { expectedSample := s actualSample := actual[i] From ee095862d2681a557138eb007b5e5fd459a92ded Mon Sep 17 00:00:00 2001 From: Fiona Liao Date: Thu, 11 Jan 2024 18:03:36 +0000 Subject: [PATCH 2/2] Lint --- tsdb/db_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 0eaa9b8e75..f0a29de690 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -4648,7 +4648,7 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { defer querier.Close() seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")) - require.Equal(t, 1, len(seriesSet)) + require.Len(t, seriesSet, 1) gotSamples := seriesSet[series1.String()] compareSamples(t, series1.String(), expSamples, gotSamples, true) @@ -4659,7 +4659,7 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { chks := queryChunks(t, chunkQuerier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")) require.NotNil(t, chks[series1.String()]) - require.Equal(t, 1, len(chks)) + require.Len(t, chks, 1) var gotChunkSamples []chunks.Sample for _, chunk := range chks[series1.String()] { it := chunk.Chunk.Iterator(nil) @@ -5493,7 +5493,7 @@ func testQuerierOOOQuery(t *testing.T, seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")) gotSamples := seriesSet[series1.String()] require.NotNil(t, gotSamples) - require.Equal(t, 1, len(seriesSet)) + require.Len(t, seriesSet, 1) compareSamples(t, series1.String(), expSamples, gotSamples, true) require.GreaterOrEqual(t, float64(oooSamples), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended), "number of ooo appended samples mismatch") })