Skip to content

Commit

Permalink
Merge pull request #15025 from prometheus/notreset-bug
Browse files Browse the repository at this point in the history
Fix bug in rate vs float and histogram mixup
# Conflicts:
#	promql/engine_test.go
  • Loading branch information
beorn7 authored and charleskorn committed Oct 3, 2024
1 parent 4022e74 commit 598939f
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 53 deletions.
115 changes: 115 additions & 0 deletions promql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sort"
"strconv"
"sync"
Expand All @@ -28,11 +29,13 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"
"github.com/prometheus/prometheus/promql/promqltest"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/stats"
"github.com/prometheus/prometheus/util/teststorage"
Expand Down Expand Up @@ -3478,3 +3481,115 @@ histogram {{sum:4 count:4 buckets:[2 2]}} {{sum:6 count:6 buckets:[3 3]}} {{sum:
},
})
}

func TestHistogramRateWithFloatStaleness(t *testing.T) {
// Make a chunk with two normal histograms of the same value.
h1 := histogram.Histogram{
Schema: 2,
Count: 10,
Sum: 100,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
PositiveBuckets: []int64{100},
}

c1 := chunkenc.NewHistogramChunk()
app, err := c1.Appender()
require.NoError(t, err)
var (
newc chunkenc.Chunk
recoded bool
)

newc, recoded, app, err = app.AppendHistogram(nil, 0, h1.Copy(), false)
require.NoError(t, err)
require.False(t, recoded)
require.Nil(t, newc)

newc, recoded, _, err = app.AppendHistogram(nil, 10, h1.Copy(), false)
require.NoError(t, err)
require.False(t, recoded)
require.Nil(t, newc)

// Make a chunk with a single float stale marker.
c2 := chunkenc.NewXORChunk()
app, err = c2.Appender()
require.NoError(t, err)

app.Append(20, math.Float64frombits(value.StaleNaN))

// Make a chunk with two normal histograms that have zero value.
h2 := histogram.Histogram{
Schema: 2,
}

c3 := chunkenc.NewHistogramChunk()
app, err = c3.Appender()
require.NoError(t, err)

newc, recoded, app, err = app.AppendHistogram(nil, 30, h2.Copy(), false)
require.NoError(t, err)
require.False(t, recoded)
require.Nil(t, newc)

newc, recoded, _, err = app.AppendHistogram(nil, 40, h2.Copy(), false)
require.NoError(t, err)
require.False(t, recoded)
require.Nil(t, newc)

querier := storage.MockQuerier{
SelectMockFunction: func(_ bool, _ *storage.SelectHints, _ ...*labels.Matcher) storage.SeriesSet {
return &singleSeriesSet{
series: mockSeries{chunks: []chunkenc.Chunk{c1, c2, c3}, labelSet: []string{"__name__", "foo"}},
}
},
}

queriable := storage.MockQueryable{MockQuerier: &querier}

engine := promqltest.NewTestEngine(t, false, 0, promqltest.DefaultMaxSamplesPerQuery)

q, err := engine.NewInstantQuery(context.Background(), &queriable, nil, "rate(foo[40s])", timestamp.Time(45))
require.NoError(t, err)
defer q.Close()

res := q.Exec(context.Background())
require.NoError(t, res.Err)

vec, err := res.Vector()
require.NoError(t, err)

// Single sample result.
require.Len(t, vec, 1)
// The result is a histogram.
require.NotNil(t, vec[0].H)
// The result should be zero as the histogram has not increased, so the rate is zero.
require.Equal(t, 0.0, vec[0].H.Count)
require.Equal(t, 0.0, vec[0].H.Sum)
}

type singleSeriesSet struct {
series storage.Series
consumed bool
}

func (s *singleSeriesSet) Next() bool { c := s.consumed; s.consumed = true; return !c }
func (s singleSeriesSet) At() storage.Series { return s.series }
func (s singleSeriesSet) Err() error { return nil }
func (s singleSeriesSet) Warnings() annotations.Annotations { return nil }

type mockSeries struct {
chunks []chunkenc.Chunk
labelSet []string
}

func (s mockSeries) Labels() labels.Labels {
return labels.FromStrings(s.labelSet...)
}

func (s mockSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
iterables := []chunkenc.Iterator{}
for _, c := range s.chunks {
iterables = append(iterables, c.Iterator(nil))
}
return storage.ChainSampleIteratorFromIterators(it, iterables)
}
71 changes: 18 additions & 53 deletions storage/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ func (s fSample) Type() chunkenc.ValueType {
return chunkenc.ValFloat
}

func (s fSample) Copy() chunks.Sample {
return s
}

type hSample struct {
t int64
h *histogram.Histogram
Expand All @@ -212,6 +216,10 @@ func (s hSample) Type() chunkenc.ValueType {
return chunkenc.ValHistogram
}

func (s hSample) Copy() chunks.Sample {
return hSample{t: s.t, h: s.h.Copy()}
}

type fhSample struct {
t int64
fh *histogram.FloatHistogram
Expand All @@ -237,6 +245,10 @@ func (s fhSample) Type() chunkenc.ValueType {
return chunkenc.ValFloatHistogram
}

func (s fhSample) Copy() chunks.Sample {
return fhSample{t: s.t, fh: s.fh.Copy()}
}

type sampleRing struct {
delta int64

Expand Down Expand Up @@ -535,55 +547,8 @@ func (r *sampleRing) addFH(s fhSample) {
}
}

// genericAdd is a generic implementation of adding a chunks.Sample
// implementation to a buffer of a sample ring. However, the Go compiler
// currently (go1.20) decides to not expand the code during compile time, but
// creates dynamic code to handle the different types. That has a significant
// overhead during runtime, noticeable in PromQL benchmarks. For example, the
// "RangeQuery/expr=rate(a_hundred[1d]),steps=.*" benchmarks show about 7%
// longer runtime, 9% higher allocation size, and 10% more allocations.
// Therefore, genericAdd has been manually implemented for all the types
// (addSample, addF, addH, addFH) below.
//
// func genericAdd[T chunks.Sample](s T, buf []T, r *sampleRing) []T {
// l := len(buf)
// // Grow the ring buffer if it fits no more elements.
// if l == 0 {
// buf = make([]T, 16)
// l = 16
// }
// if l == r.l {
// newBuf := make([]T, 2*l)
// copy(newBuf[l+r.f:], buf[r.f:])
// copy(newBuf, buf[:r.f])
//
// buf = newBuf
// r.i = r.f
// r.f += l
// l = 2 * l
// } else {
// r.i++
// if r.i >= l {
// r.i -= l
// }
// }
//
// buf[r.i] = s
// r.l++
//
// // Free head of the buffer of samples that just fell out of the range.
// tmin := s.T() - r.delta
// for buf[r.f].T() < tmin {
// r.f++
// if r.f >= l {
// r.f -= l
// }
// r.l--
// }
// return buf
// }

// addSample is a handcoded specialization of genericAdd (see above).
// addSample adds a sample to a buffer of chunks.Sample, i.e. the general case
// using an interface as the type.
func addSample(s chunks.Sample, buf []chunks.Sample, r *sampleRing) []chunks.Sample {
l := len(buf)
// Grow the ring buffer if it fits no more elements.
Expand All @@ -607,7 +572,7 @@ func addSample(s chunks.Sample, buf []chunks.Sample, r *sampleRing) []chunks.Sam
}
}

buf[r.i] = s
buf[r.i] = s.Copy()
r.l++

// Free head of the buffer of samples that just fell out of the range.
Expand All @@ -622,7 +587,7 @@ func addSample(s chunks.Sample, buf []chunks.Sample, r *sampleRing) []chunks.Sam
return buf
}

// addF is a handcoded specialization of genericAdd (see above).
// addF adds an fSample to a (specialized) fSample buffer.
func addF(s fSample, buf []fSample, r *sampleRing) []fSample {
l := len(buf)
// Grow the ring buffer if it fits no more elements.
Expand Down Expand Up @@ -661,7 +626,7 @@ func addF(s fSample, buf []fSample, r *sampleRing) []fSample {
return buf
}

// addH is a handcoded specialization of genericAdd (see above).
// addF adds an hSample to a (specialized) hSample buffer.
func addH(s hSample, buf []hSample, r *sampleRing) []hSample {
l := len(buf)
// Grow the ring buffer if it fits no more elements.
Expand Down Expand Up @@ -705,7 +670,7 @@ func addH(s hSample, buf []hSample, r *sampleRing) []hSample {
return buf
}

// addFH is a handcoded specialization of genericAdd (see above).
// addFH adds an fhSample to a (specialized) fhSample buffer.
func addFH(s fhSample, buf []fhSample, r *sampleRing) []fhSample {
l := len(buf)
// Grow the ring buffer if it fits no more elements.
Expand Down
50 changes: 50 additions & 0 deletions storage/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,56 @@ func TestBufferedSeriesIteratorMixedHistograms(t *testing.T) {
require.Equal(t, histograms[1].ToFloat(nil), fh)
}

func TestBufferedSeriesIteratorMixedFloatsAndHistograms(t *testing.T) {
histograms := tsdbutil.GenerateTestHistograms(5)

it := NewBufferIterator(NewListSeriesIteratorWithCopy(samples{
hSample{t: 1, h: histograms[0].Copy()},
fSample{t: 2, f: 2},
hSample{t: 3, h: histograms[1].Copy()},
hSample{t: 4, h: histograms[2].Copy()},
fhSample{t: 3, fh: histograms[3].ToFloat(nil)},
fhSample{t: 4, fh: histograms[4].ToFloat(nil)},
}), 6)

require.Equal(t, chunkenc.ValNone, it.Seek(7))
require.NoError(t, it.Err())

buf := it.Buffer()

require.Equal(t, chunkenc.ValHistogram, buf.Next())
_, h0 := buf.AtHistogram()
require.Equal(t, histograms[0], h0)

require.Equal(t, chunkenc.ValFloat, buf.Next())
_, v := buf.At()
require.Equal(t, 2.0, v)

require.Equal(t, chunkenc.ValHistogram, buf.Next())
_, h1 := buf.AtHistogram()
require.Equal(t, histograms[1], h1)

require.Equal(t, chunkenc.ValHistogram, buf.Next())
_, h2 := buf.AtHistogram()
require.Equal(t, histograms[2], h2)

require.Equal(t, chunkenc.ValFloatHistogram, buf.Next())
_, h3 := buf.AtFloatHistogram(nil)
require.Equal(t, histograms[3].ToFloat(nil), h3)

require.Equal(t, chunkenc.ValFloatHistogram, buf.Next())
_, h4 := buf.AtFloatHistogram(nil)
require.Equal(t, histograms[4].ToFloat(nil), h4)

// Test for overwrite bug where the buffered histogram was reused
// between items in the buffer.
require.Equal(t, histograms[0], h0)
require.Equal(t, histograms[1], h1)
require.Equal(t, histograms[2], h2)
require.Equal(t, histograms[3].ToFloat(nil), h3)
require.Equal(t, histograms[4].ToFloat(nil), h4)
}

func BenchmarkBufferedSeriesIterator(b *testing.B) {
// Simulate a 5 minute rate.
it := NewBufferIterator(newFakeSeriesIterator(int64(b.N), 30), 5*60)
Expand Down
28 changes: 28 additions & 0 deletions storage/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,34 @@ func (it *listSeriesIterator) Seek(t int64) chunkenc.ValueType {

func (it *listSeriesIterator) Err() error { return nil }

type listSeriesIteratorWithCopy struct {
*listSeriesIterator
}

func NewListSeriesIteratorWithCopy(samples Samples) chunkenc.Iterator {
return &listSeriesIteratorWithCopy{
listSeriesIterator: &listSeriesIterator{samples: samples, idx: -1},
}
}

func (it *listSeriesIteratorWithCopy) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
t, ih := it.listSeriesIterator.AtHistogram(nil)
if h == nil || ih == nil {
return t, ih
}
ih.CopyTo(h)
return t, h
}

func (it *listSeriesIteratorWithCopy) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
t, ih := it.listSeriesIterator.AtFloatHistogram(nil)
if fh == nil || ih == nil {
return t, ih
}
ih.CopyTo(fh)
return t, fh
}

type listChunkSeriesIterator struct {
chks []chunks.Meta
idx int
Expand Down
12 changes: 12 additions & 0 deletions tsdb/chunks/samples.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Sample interface {
H() *histogram.Histogram
FH() *histogram.FloatHistogram
Type() chunkenc.ValueType
Copy() Sample // Returns a deep copy.
}

type SampleSlice []Sample
Expand Down Expand Up @@ -70,6 +71,17 @@ func (s sample) Type() chunkenc.ValueType {
}
}

func (s sample) Copy() Sample {
c := sample{t: s.t, f: s.f}
if s.h != nil {
c.h = s.h.Copy()
}
if s.fh != nil {
c.fh = s.fh.Copy()
}
return c
}

// GenerateSamples starting at start and counting up numSamples.
func GenerateSamples(start, numSamples int) []Sample {
return generateSamples(start, numSamples, func(i int) Sample {
Expand Down
Loading

0 comments on commit 598939f

Please sign in to comment.