Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multiple encoding test #579

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 169 additions & 18 deletions tsdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4600,6 +4600,171 @@ func TestMetadataAssertInMemoryData(t *testing.T) {
require.Equal(t, *reopenDB.head.series.getByHash(s4.Hash(), s4).meta, m4)
}

// TestMultipleEncodingsCommitOrder mainly serves to demonstrate when happens when committing a batch of samples for the
// same series when there are multiple encodings. Commit() will process all float samples before histogram samples. This
// means that if histograms are appended before floats, the histograms could be marked as OOO when they are committed.
// While possible, this shouldn't happen very often - you need the same series to be ingested as both a float and a
// histogram in a single write request.
func TestMultipleEncodingsCommitOrder(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderCapMax = 30
opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds()
opts.AllowOverlappingCompaction = false

series1 := labels.FromStrings("foo", "bar1")

db := openTestDB(t, opts, nil)
db.DisableCompactions()
db.EnableNativeHistograms()
db.EnableOOONativeHistograms()
defer func() {
require.NoError(t, db.Close())
}()

addSample := func(app storage.Appender, ts int64, valType chunkenc.ValueType) chunks.Sample {
if valType == chunkenc.ValFloat {
_, err := app.Append(0, labels.FromStrings("foo", "bar1"), ts, float64(ts))
require.NoError(t, err)
return sample{t: ts, f: float64(ts)}
}
if valType == chunkenc.ValHistogram {
h := tsdbutil.GenerateTestHistogram(int(ts))
_, err := app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, h, nil)
require.NoError(t, err)
return sample{t: ts, h: h}
}
fh := tsdbutil.GenerateTestFloatHistogram(int(ts))
_, err := app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, nil, fh)
require.NoError(t, err)
return sample{t: ts, fh: fh}
}

verifySamples := func(minT, maxT int64, expSamples []chunks.Sample, oooCount int) {
require.Equal(t, float64(oooCount), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended), "number of ooo appended samples mismatch")

// Verify samples querier.
querier, err := db.Querier(minT, maxT)
require.NoError(t, err)
defer querier.Close()

seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1"))
require.Len(t, seriesSet, 1)
gotSamples := seriesSet[series1.String()]
compareSamples(t, series1.String(), expSamples, gotSamples, true)

// Verify chunks querier.
chunkQuerier, err := db.ChunkQuerier(minT, maxT)
require.NoError(t, err)
defer chunkQuerier.Close()

chks := queryChunks(t, chunkQuerier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1"))
require.NotNil(t, chks[series1.String()])
require.Len(t, chks, 1)
var gotChunkSamples []chunks.Sample
for _, chunk := range chks[series1.String()] {
it := chunk.Chunk.Iterator(nil)
for t := it.Next(); t != chunkenc.ValNone; t = it.Next() {
switch t {
case chunkenc.ValFloat:
t, v := it.At()
gotChunkSamples = append(gotChunkSamples, sample{t: t, f: v})
case chunkenc.ValHistogram:
t, v := it.AtHistogram()
gotChunkSamples = append(gotChunkSamples, sample{t: t, h: v})
case chunkenc.ValFloatHistogram:
t, v := it.AtFloatHistogram()
gotChunkSamples = append(gotChunkSamples, sample{t: t, fh: v})
}
}
require.NoError(t, it.Err())
}
compareSamples(t, series1.String(), expSamples, gotChunkSamples, true)
}

var expSamples []chunks.Sample

// Append samples with different encoding types and then commit them at once.
app := db.Appender(context.Background())

for i := 100; i < 105; i++ {
s := addSample(app, int64(i), chunkenc.ValFloat)
expSamples = append(expSamples, s)
}
// These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the
// same batch.
for i := 110; i < 120; i++ {
s := addSample(app, int64(i), chunkenc.ValHistogram)
expSamples = append(expSamples, s)
}
// These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the
// same batch.
for i := 120; i < 130; i++ {
s := addSample(app, int64(i), chunkenc.ValFloatHistogram)
expSamples = append(expSamples, s)
}
// These samples will be marked as in-order as their timestamps are greater than the max timestamp for float
// samples in the same batch.
for i := 140; i < 150; i++ {
s := addSample(app, int64(i), chunkenc.ValFloatHistogram)
expSamples = append(expSamples, s)
}
// These samples will be marked as in-order, even though they're appended after the float histograms from ts 140-150
// because float samples are processed first and these samples are in-order wrt to the float samples in the batch.
for i := 130; i < 135; i++ {
s := addSample(app, int64(i), chunkenc.ValFloat)
expSamples = append(expSamples, s)
}

require.NoError(t, app.Commit())

sort.Slice(expSamples, func(i, j int) bool {
return expSamples[i].T() < expSamples[j].T()
})

// oooCount = 20 because the histograms from 120 - 130 and float histograms from 120 - 130 are detected as OOO.
verifySamples(100, 150, expSamples, 20)

// Append and commit some in-order histograms by themselves.
app = db.Appender(context.Background())
for i := 150; i < 160; i++ {
s := addSample(app, int64(i), chunkenc.ValHistogram)
expSamples = append(expSamples, s)
}
require.NoError(t, app.Commit())

// oooCount remains at 20 as no new OOO samples have been added.
verifySamples(100, 160, expSamples, 20)

// Append and commit samples for all encoding types. This time all samples will be treated as OOO because samples
// with newer timestamps have already been committed.
app = db.Appender(context.Background())
for i := 50; i < 55; i++ {
s := addSample(app, int64(i), chunkenc.ValFloat)
expSamples = append(expSamples, s)
}
for i := 60; i < 70; i++ {
s := addSample(app, int64(i), chunkenc.ValHistogram)
expSamples = append(expSamples, s)
}
for i := 70; i < 75; i++ {
s := addSample(app, int64(i), chunkenc.ValFloat)
expSamples = append(expSamples, s)
}
for i := 80; i < 90; i++ {
s := addSample(app, int64(i), chunkenc.ValFloatHistogram)
expSamples = append(expSamples, s)
}
require.NoError(t, app.Commit())

// Sort samples again because OOO samples have been added.
sort.Slice(expSamples, func(i, j int) bool {
return expSamples[i].T() < expSamples[j].T()
})

// oooCount = 50 as we've added 30 more OOO samples.
verifySamples(50, 160, expSamples, 50)
}

// TODO(codesome): test more samples incoming once compaction has started. To verify new samples after the start
//
// are not included in this compaction.
Expand Down Expand Up @@ -5326,22 +5491,10 @@ func testQuerierOOOQuery(t *testing.T,
defer querier.Close()

seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1"))
require.NotNil(t, seriesSet[series1.String()])
gotSamples := seriesSet[series1.String()]
require.NotNil(t, gotSamples)
require.Len(t, seriesSet, 1)
var gotSamples []chunks.Sample
for _, sample := range seriesSet[series1.String()] {
switch sample.Type() {
case chunkenc.ValFloat:
gotSamples = append(gotSamples, sample)
case chunkenc.ValHistogram:
sample.H().CounterResetHint = histogram.UnknownCounterReset
gotSamples = append(gotSamples, sample)
case chunkenc.ValFloatHistogram:
sample.FH().CounterResetHint = histogram.UnknownCounterReset
gotSamples = append(gotSamples, sample)
}
}
require.Equal(t, expSamples, gotSamples)
compareSamples(t, series1.String(), expSamples, gotSamples, true)
require.GreaterOrEqual(t, float64(oooSamples), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended), "number of ooo appended samples mismatch")
})
}
Expand Down Expand Up @@ -5444,11 +5597,9 @@ func testChunkQuerierOOOQuery(t *testing.T,
samples = append(samples, sample{t: t, f: v})
case chunkenc.ValHistogram:
t, v := it.AtHistogram()
v.CounterResetHint = histogram.UnknownCounterReset
samples = append(samples, sample{t: t, h: v})
case chunkenc.ValFloatHistogram:
t, v := it.AtFloatHistogram()
v.CounterResetHint = histogram.UnknownCounterReset
samples = append(samples, sample{t: t, fh: v})
}
}
Expand Down Expand Up @@ -5568,7 +5719,7 @@ func testChunkQuerierOOOQuery(t *testing.T,
gotSamples = append(gotSamples, extractSamples(it)...)
require.NoError(t, it.Err())
}
require.Equal(t, expSamples, gotSamples)
compareSamples(t, series1.String(), expSamples, gotSamples, true)
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion tsdb/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func requireEqualSamples(t *testing.T, expected, actual map[string][]chunks.Samp
for name, expectedItem := range expected {
actualItem, ok := actual[name]
require.True(t, ok, "Expected series %s not found", name)
require.Equal(t, len(expectedItem), len(actualItem), "Length not expected for %s", name)
compareSamples(t, name, expectedItem, actualItem, ignoreCounterResets)
}
for name := range actual {
Expand All @@ -105,6 +104,7 @@ func requireEqualSamples(t *testing.T, expected, actual map[string][]chunks.Samp
}

func compareSamples(t *testing.T, name string, expected, actual []chunks.Sample, ignoreCounterResets bool) {
require.Equal(t, len(expected), len(actual), "Length not expected for %s", name)
for i, s := range expected {
expectedSample := s
actualSample := actual[i]
Expand Down
Loading