Skip to content

Commit

Permalink
Add multiple encoding test (#579)
Browse files Browse the repository at this point in the history
  • Loading branch information
fionaliao committed Jan 19, 2024
1 parent 22b741a commit eafc2be
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 19 deletions.
187 changes: 169 additions & 18 deletions tsdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4600,6 +4600,171 @@ func TestMetadataAssertInMemoryData(t *testing.T) {
require.Equal(t, *reopenDB.head.series.getByHash(s4.Hash(), s4).meta, m4)
}

// TestMultipleEncodingsCommitOrder mainly serves to demonstrate when happens when committing a batch of samples for the
// same series when there are multiple encodings. Commit() will process all float samples before histogram samples. This
// means that if histograms are appended before floats, the histograms could be marked as OOO when they are committed.
// While possible, this shouldn't happen very often - you need the same series to be ingested as both a float and a
// histogram in a single write request.
func TestMultipleEncodingsCommitOrder(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderCapMax = 30
opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds()
opts.AllowOverlappingCompaction = false

Check failure on line 4612 in tsdb/db_test.go

View workflow job for this annotation

GitHub Actions / lint

opts.AllowOverlappingCompaction undefined (type *Options has no field or method AllowOverlappingCompaction) (typecheck)

Check failure on line 4612 in tsdb/db_test.go

View workflow job for this annotation

GitHub Actions / lint

opts.AllowOverlappingCompaction undefined (type *Options has no field or method AllowOverlappingCompaction) (typecheck)

series1 := labels.FromStrings("foo", "bar1")

db := openTestDB(t, opts, nil)
db.DisableCompactions()
db.EnableNativeHistograms()
db.EnableOOONativeHistograms()
defer func() {
require.NoError(t, db.Close())
}()

addSample := func(app storage.Appender, ts int64, valType chunkenc.ValueType) chunks.Sample {
if valType == chunkenc.ValFloat {
_, err := app.Append(0, labels.FromStrings("foo", "bar1"), ts, float64(ts))
require.NoError(t, err)
return sample{t: ts, f: float64(ts)}
}
if valType == chunkenc.ValHistogram {
h := tsdbutil.GenerateTestHistogram(int(ts))
_, err := app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, h, nil)
require.NoError(t, err)
return sample{t: ts, h: h}
}
fh := tsdbutil.GenerateTestFloatHistogram(int(ts))
_, err := app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, nil, fh)
require.NoError(t, err)
return sample{t: ts, fh: fh}
}

verifySamples := func(minT, maxT int64, expSamples []chunks.Sample, oooCount int) {
require.Equal(t, float64(oooCount), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended), "number of ooo appended samples mismatch")

// Verify samples querier.
querier, err := db.Querier(minT, maxT)
require.NoError(t, err)
defer querier.Close()

seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1"))
require.Len(t, seriesSet, 1)
gotSamples := seriesSet[series1.String()]
compareSamples(t, series1.String(), expSamples, gotSamples, true)

// Verify chunks querier.
chunkQuerier, err := db.ChunkQuerier(minT, maxT)
require.NoError(t, err)
defer chunkQuerier.Close()

chks := queryChunks(t, chunkQuerier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1"))
require.NotNil(t, chks[series1.String()])
require.Len(t, chks, 1)
var gotChunkSamples []chunks.Sample
for _, chunk := range chks[series1.String()] {
it := chunk.Chunk.Iterator(nil)
for t := it.Next(); t != chunkenc.ValNone; t = it.Next() {
switch t {
case chunkenc.ValFloat:
t, v := it.At()
gotChunkSamples = append(gotChunkSamples, sample{t: t, f: v})
case chunkenc.ValHistogram:
t, v := it.AtHistogram()
gotChunkSamples = append(gotChunkSamples, sample{t: t, h: v})
case chunkenc.ValFloatHistogram:
t, v := it.AtFloatHistogram()
gotChunkSamples = append(gotChunkSamples, sample{t: t, fh: v})
}
}
require.NoError(t, it.Err())
}
compareSamples(t, series1.String(), expSamples, gotChunkSamples, true)
}

var expSamples []chunks.Sample

// Append samples with different encoding types and then commit them at once.
app := db.Appender(context.Background())

for i := 100; i < 105; i++ {
s := addSample(app, int64(i), chunkenc.ValFloat)
expSamples = append(expSamples, s)
}
// These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the
// same batch.
for i := 110; i < 120; i++ {
s := addSample(app, int64(i), chunkenc.ValHistogram)
expSamples = append(expSamples, s)
}
// These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the
// same batch.
for i := 120; i < 130; i++ {
s := addSample(app, int64(i), chunkenc.ValFloatHistogram)
expSamples = append(expSamples, s)
}
// These samples will be marked as in-order as their timestamps are greater than the max timestamp for float
// samples in the same batch.
for i := 140; i < 150; i++ {
s := addSample(app, int64(i), chunkenc.ValFloatHistogram)
expSamples = append(expSamples, s)
}
// These samples will be marked as in-order, even though they're appended after the float histograms from ts 140-150
// because float samples are processed first and these samples are in-order wrt to the float samples in the batch.
for i := 130; i < 135; i++ {
s := addSample(app, int64(i), chunkenc.ValFloat)
expSamples = append(expSamples, s)
}

require.NoError(t, app.Commit())

sort.Slice(expSamples, func(i, j int) bool {
return expSamples[i].T() < expSamples[j].T()
})

// oooCount = 20 because the histograms from 120 - 130 and float histograms from 120 - 130 are detected as OOO.
verifySamples(100, 150, expSamples, 20)

// Append and commit some in-order histograms by themselves.
app = db.Appender(context.Background())
for i := 150; i < 160; i++ {
s := addSample(app, int64(i), chunkenc.ValHistogram)
expSamples = append(expSamples, s)
}
require.NoError(t, app.Commit())

// oooCount remains at 20 as no new OOO samples have been added.
verifySamples(100, 160, expSamples, 20)

// Append and commit samples for all encoding types. This time all samples will be treated as OOO because samples
// with newer timestamps have already been committed.
app = db.Appender(context.Background())
for i := 50; i < 55; i++ {
s := addSample(app, int64(i), chunkenc.ValFloat)
expSamples = append(expSamples, s)
}
for i := 60; i < 70; i++ {
s := addSample(app, int64(i), chunkenc.ValHistogram)
expSamples = append(expSamples, s)
}
for i := 70; i < 75; i++ {
s := addSample(app, int64(i), chunkenc.ValFloat)
expSamples = append(expSamples, s)
}
for i := 80; i < 90; i++ {
s := addSample(app, int64(i), chunkenc.ValFloatHistogram)
expSamples = append(expSamples, s)
}
require.NoError(t, app.Commit())

// Sort samples again because OOO samples have been added.
sort.Slice(expSamples, func(i, j int) bool {
return expSamples[i].T() < expSamples[j].T()
})

// oooCount = 50 as we've added 30 more OOO samples.
verifySamples(50, 160, expSamples, 50)
}

// TODO(codesome): test more samples incoming once compaction has started. To verify new samples after the start
//
// are not included in this compaction.
Expand Down Expand Up @@ -5326,22 +5491,10 @@ func testQuerierOOOQuery(t *testing.T,
defer querier.Close()

seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1"))
require.NotNil(t, seriesSet[series1.String()])
gotSamples := seriesSet[series1.String()]
require.NotNil(t, gotSamples)
require.Len(t, seriesSet, 1)
var gotSamples []chunks.Sample
for _, sample := range seriesSet[series1.String()] {
switch sample.Type() {
case chunkenc.ValFloat:
gotSamples = append(gotSamples, sample)
case chunkenc.ValHistogram:
sample.H().CounterResetHint = histogram.UnknownCounterReset
gotSamples = append(gotSamples, sample)
case chunkenc.ValFloatHistogram:
sample.FH().CounterResetHint = histogram.UnknownCounterReset
gotSamples = append(gotSamples, sample)
}
}
require.Equal(t, expSamples, gotSamples)
compareSamples(t, series1.String(), expSamples, gotSamples, true)
require.GreaterOrEqual(t, float64(oooSamples), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended), "number of ooo appended samples mismatch")
})
}
Expand Down Expand Up @@ -5444,11 +5597,9 @@ func testChunkQuerierOOOQuery(t *testing.T,
samples = append(samples, sample{t: t, f: v})
case chunkenc.ValHistogram:
t, v := it.AtHistogram()
v.CounterResetHint = histogram.UnknownCounterReset
samples = append(samples, sample{t: t, h: v})
case chunkenc.ValFloatHistogram:
t, v := it.AtFloatHistogram()
v.CounterResetHint = histogram.UnknownCounterReset
samples = append(samples, sample{t: t, fh: v})
}
}
Expand Down Expand Up @@ -5568,7 +5719,7 @@ func testChunkQuerierOOOQuery(t *testing.T,
gotSamples = append(gotSamples, extractSamples(it)...)
require.NoError(t, it.Err())
}
require.Equal(t, expSamples, gotSamples)
compareSamples(t, series1.String(), expSamples, gotSamples, true)
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion tsdb/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func requireEqualSamples(t *testing.T, expected, actual map[string][]chunks.Samp
for name, expectedItem := range expected {
actualItem, ok := actual[name]
require.True(t, ok, "Expected series %s not found", name)
require.Equal(t, len(expectedItem), len(actualItem), "Length not expected for %s", name)
compareSamples(t, name, expectedItem, actualItem, ignoreCounterResets)
}
for name := range actual {
Expand All @@ -105,6 +104,7 @@ func requireEqualSamples(t *testing.T, expected, actual map[string][]chunks.Samp
}

func compareSamples(t *testing.T, name string, expected, actual []chunks.Sample, ignoreCounterResets bool) {
require.Equal(t, len(expected), len(actual), "Length not expected for %s", name)
for i, s := range expected {
expectedSample := s
actualSample := actual[i]
Expand Down

0 comments on commit eafc2be

Please sign in to comment.