Skip to content

Commit

Permalink
Add counter reset test for mmapped OOO chunks (#544)
Browse files Browse the repository at this point in the history
* Add counter reset test for mmapped OOO chunks

Fixed a couple of bugs that were discovered while writing the test:
- mint for ooo chunk needed to be set when a new chunk was created
- first chunk was using the wrong previous appender

Signed-off-by: Fiona Liao <fiona.y.liao@gmail.com>

* nolint:staticcheck

---------

Signed-off-by: Fiona Liao <fiona.y.liao@gmail.com>
  • Loading branch information
fionaliao committed Jan 19, 2024
1 parent a2cdcaa commit 914c46c
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 2 deletions.
184 changes: 184 additions & 0 deletions tsdb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4346,6 +4346,190 @@ func TestHistogramCounterResetHeader(t *testing.T) {
}
}

type expOOOMmappedChunks struct {
header chunkenc.CounterResetHeader
mint, maxt int64
numSamples uint16
}

func TestOOOHistogramCounterResetHeaders(t *testing.T) {
for _, floatHisto := range []bool{true, false} {
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
l := labels.FromStrings("a", "b")
head, _ := newTestHead(t, 1000, wlog.CompressionNone, true)
head.opts.OutOfOrderCapMax.Store(5)

t.Cleanup(func() {
require.NoError(t, head.Close())
})
require.NoError(t, head.Init(0))

appendHistogram := func(ts int64, h *histogram.Histogram) {
app := head.Appender(context.Background())
var err error
if floatHisto {
_, err = app.AppendHistogram(0, l, ts, nil, h.ToFloat())
} else {
_, err = app.AppendHistogram(0, l, ts, h.Copy(), nil)
}
require.NoError(t, err)
require.NoError(t, app.Commit())
}

var expChunks []expOOOMmappedChunks
checkOOOExpCounterResetHeader := func(newChunks ...expOOOMmappedChunks) {
expChunks = append(expChunks, newChunks...)

ms, _, err := head.getOrCreate(l.Hash(), l)
require.NoError(t, err)

require.Len(t, ms.ooo.oooMmappedChunks, len(expChunks))

for i, mmapChunk := range ms.ooo.oooMmappedChunks {
chk, err := head.chunkDiskMapper.Chunk(mmapChunk.ref)
require.NoError(t, err)
if floatHisto {
require.Equal(t, expChunks[i].header, chk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader())
} else {
require.Equal(t, expChunks[i].header, chk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
}
require.Equal(t, expChunks[i].mint, mmapChunk.minTime)
require.Equal(t, expChunks[i].maxt, mmapChunk.maxTime)
require.Equal(t, expChunks[i].numSamples, mmapChunk.numSamples)
}
}

h := tsdbutil.GenerateTestHistograms(1)[0]
h.PositiveBuckets = []int64{100, 1, 1, 1}
h.NegativeBuckets = []int64{100, 1, 1, 1}
h.Count = 1000

// Append an in-order histogram, so the rest of the samples can be detected as OOO.
appendHistogram(1000, h)

// OOO histogram
for i := 1; i <= 5; i++ {
h.Count = 1000 + uint64(i)
appendHistogram(100+int64(i), h)
}
// Nothing mmapped yet.
checkOOOExpCounterResetHeader()

// 6th observation (which triggers a head chunk mmapping).
h.Count = 1002
appendHistogram(int64(112), h)

// One mmapped chunk with (ts, val) [(101, 1001), (102, 1002), (103, 1003), (104, 1004), (105, 1005)].
checkOOOExpCounterResetHeader(expOOOMmappedChunks{
header: chunkenc.UnknownCounterReset,
mint: 101,
maxt: 105,
numSamples: 5,
})

// Add more samples, there's a counter reset at ts 122.
h.Count = 1001
appendHistogram(int64(110), h)
h.Count = 904
appendHistogram(int64(124), h)
h.Count = 903
appendHistogram(int64(123), h)
h.Count = 902
appendHistogram(int64(122), h)

// New samples not mmapped yet.
checkOOOExpCounterResetHeader()

// 11th observation (which triggers another head chunk mmapping).
h.Count = 2000
appendHistogram(int64(200), h)

// Two new mmapped chunks [(110, 1001), (112, 1002)], [(122, 902), (123, 903), (124, 904)].
checkOOOExpCounterResetHeader(
expOOOMmappedChunks{
header: chunkenc.UnknownCounterReset,
mint: 110,
maxt: 112,
numSamples: 2,
},
expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 122,
maxt: 124,
numSamples: 3,
},
)

// Count is lower than previous sample at ts 200, so the NotCounterReset is ignored.
h.Count = 1000
h.CounterResetHint = histogram.NotCounterReset
appendHistogram(int64(205), h)

h.Count = 2010
h.CounterResetHint = histogram.CounterReset
appendHistogram(int64(210), h)

h.Count = 2020
h.CounterResetHint = histogram.UnknownCounterReset
appendHistogram(int64(220), h)

h.Count = 2005
appendHistogram(int64(215), h)

// 16th observation (which triggers another head chunk mmapping).
h.Count = 4000
appendHistogram(int64(350), h)

// Four new mmapped chunks: [(200, 2000)] [(205, 1000)], [(210, 2010)], [(215, 2015), (220, 2020)]
checkOOOExpCounterResetHeader(
expOOOMmappedChunks{
header: chunkenc.UnknownCounterReset,
mint: 200,
maxt: 200,
numSamples: 1,
},
expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 205,
maxt: 205,
numSamples: 1,
},
expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 210,
maxt: 210,
numSamples: 1,
},
expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 215,
maxt: 220,
numSamples: 2,
},
)

// Adding five more samples (21 in total), so another mmapped chunk is created.
h.CounterResetHint = histogram.CounterReset
h.Count = 3000
appendHistogram(300, h)

h.CounterResetHint = histogram.UnknownCounterReset
for i := 1; i <= 4; i++ {
h.Count = 3000 + uint64(i)
appendHistogram(300+int64(i), h)
}

// One mmapped chunk with (ts, val) [(300, 3000), (301, 3001), (302, 3002), (303, 3003), (350, 4000)].
checkOOOExpCounterResetHeader(expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 300,
maxt: 350,
numSamples: 5,
})
})
}
}

func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
dir := t.TempDir()
opts := DefaultOptions()
Expand Down
12 changes: 10 additions & 2 deletions tsdb/ooo_head.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
} else if s.fh != nil {
encoding = chunkenc.EncFloatHistogram
}

// prevApp is the appender for the previous sample.
prevApp := app

if encoding != prevEncoding { // For the first sample, this will always be true as EncNone != EncXOR | EncHistogram | EncFloatHistogram
if prevEncoding != chunkenc.EncNone {
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
Expand All @@ -126,7 +130,8 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
case chunkenc.EncXOR:
app.Append(s.t, s.f)
case chunkenc.EncHistogram:
prevHApp, _ := app.(*chunkenc.HistogramAppender)
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevHApp, _ := prevApp.(*chunkenc.HistogramAppender)
var (
newChunk chunkenc.Chunk
recoded bool
Expand All @@ -137,9 +142,11 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
}
chunk = newChunk
cmint = s.t
}
case chunkenc.EncFloatHistogram:
prevHApp, _ := app.(*chunkenc.FloatHistogramAppender)
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevHApp, _ := prevApp.(*chunkenc.FloatHistogramAppender)
var (
newChunk chunkenc.Chunk
recoded bool
Expand All @@ -150,6 +157,7 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
}
chunk = newChunk
cmint = s.t
}
}
cmaxt = s.t
Expand Down

0 comments on commit 914c46c

Please sign in to comment.