From c6a6bd43875769d7400e8462e63730fe1a49ab11 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Tue, 14 May 2024 13:20:50 +0200 Subject: [PATCH] deltatocumulative: exponential histograms (#32030) **Description:** Implements accumulation of exponential histograms by adding bucket-per-bucket. - [x] Align bucket offset to the smaller one - [x] Merge buckets by adding up each buckets count - [x] Widen zero buckets so they are the same - [x] Adjust scale to the lowest one **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30705 **Testing:** Extensive tests have been added to the `internal/data` package **Documentation:** not needed --- .chloggen/deltatocumulative-exphist.yaml | 29 ++++ processor/deltatocumulativeprocessor/go.mod | 5 + .../internal/data/add.go | 51 ++++++- .../internal/data/data.go | 10 +- .../internal/data/expo/expo.go | 52 +++++++ .../internal/data/expo/expo_test.go | 63 +++++++++ .../internal/data/expo/expotest/bins.go | 81 +++++++++++ .../internal/data/expo/expotest/equal.go | 115 +++++++++++++++ .../internal/data/expo/expotest/equal_test.go | 73 ++++++++++ .../internal/data/expo/expotest/histogram.go | 65 +++++++++ .../internal/data/expo/merge.go | 37 +++++ .../internal/data/expo/merge_test.go | 53 +++++++ .../internal/data/expo/ord.go | 16 +++ .../internal/data/expo/ord_test.go | 40 ++++++ .../internal/data/expo/scale.go | 115 +++++++++++++++ .../internal/data/expo/scale_test.go | 90 ++++++++++++ .../internal/data/expo/zero.go | 68 +++++++++ .../internal/data/expo/zero_test.go | 125 +++++++++++++++++ .../internal/data/expo_test.go | 131 ++++++++++++++++++ .../internal/metrics/data.go | 2 +- .../internal/telemetry/metrics.go | 15 +- .../deltatocumulativeprocessor/processor.go | 59 +++++--- 22 files changed, 1263 insertions(+), 32 deletions(-) create mode 100644 .chloggen/deltatocumulative-exphist.yaml create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/expo.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/expo_test.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/expotest/bins.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal_test.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/expotest/histogram.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/merge.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/merge_test.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/ord.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/ord_test.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/scale.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/scale_test.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/zero.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/zero_test.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo_test.go diff --git a/.chloggen/deltatocumulative-exphist.yaml b/.chloggen/deltatocumulative-exphist.yaml new file mode 100644 index 000000000000..7dfa30bf54e4 --- /dev/null +++ b/.chloggen/deltatocumulative-exphist.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: deltatocumulativeprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: exponential histogram accumulation + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31340] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + accumulates exponential histogram datapoints by adding respective bucket counts. + also handles downscaling, changing zero-counts, offset adaptions and optional fields + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/deltatocumulativeprocessor/go.mod b/processor/deltatocumulativeprocessor/go.mod index 859bfc1eb028..a49a75e803b6 100644 --- a/processor/deltatocumulativeprocessor/go.mod +++ b/processor/deltatocumulativeprocessor/go.mod @@ -4,6 +4,7 @@ go 1.21.0 require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.100.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.100.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.100.0 go.opentelemetry.io/collector/confmap v0.100.0 @@ -58,4 +59,8 @@ require ( replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden diff --git a/processor/deltatocumulativeprocessor/internal/data/add.go b/processor/deltatocumulativeprocessor/internal/data/add.go index b40bf05b916d..94a575b1bd9f 100644 --- a/processor/deltatocumulativeprocessor/internal/data/add.go +++ b/processor/deltatocumulativeprocessor/internal/data/add.go @@ -3,7 +3,13 @@ package data // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" -import "go.opentelemetry.io/collector/pdata/pmetric" +import ( + "math" + + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" +) func (dp Number) Add(in Number) Number { switch in.ValueType() { @@ -23,7 +29,46 @@ func (dp Histogram) Add(in Histogram) Histogram { panic("todo") } -// nolint func (dp ExpHistogram) Add(in ExpHistogram) ExpHistogram { - panic("todo") + type H = ExpHistogram + + if dp.Scale() != in.Scale() { + hi, lo := expo.HiLo(dp, in, H.Scale) + from, to := expo.Scale(hi.Scale()), expo.Scale(lo.Scale()) + expo.Downscale(hi.Positive(), from, to) + expo.Downscale(hi.Negative(), from, to) + hi.SetScale(lo.Scale()) + } + + if dp.ZeroThreshold() != in.ZeroThreshold() { + hi, lo := expo.HiLo(dp, in, H.ZeroThreshold) + expo.WidenZero(lo.DataPoint, hi.ZeroThreshold()) + } + + expo.Merge(dp.Positive(), in.Positive()) + expo.Merge(dp.Negative(), in.Negative()) + + dp.SetTimestamp(in.Timestamp()) + dp.SetCount(dp.Count() + in.Count()) + dp.SetZeroCount(dp.ZeroCount() + in.ZeroCount()) + + if dp.HasSum() && in.HasSum() { + dp.SetSum(dp.Sum() + in.Sum()) + } else { + dp.RemoveSum() + } + + if dp.HasMin() && in.HasMin() { + dp.SetMin(math.Min(dp.Min(), in.Min())) + } else { + dp.RemoveMin() + } + + if dp.HasMax() && in.HasMax() { + dp.SetMax(math.Max(dp.Max(), in.Max())) + } else { + dp.RemoveMax() + } + + return dp } diff --git a/processor/deltatocumulativeprocessor/internal/data/data.go b/processor/deltatocumulativeprocessor/internal/data/data.go index 941b3cff904f..eade94eadf92 100644 --- a/processor/deltatocumulativeprocessor/internal/data/data.go +++ b/processor/deltatocumulativeprocessor/internal/data/data.go @@ -6,6 +6,8 @@ package data // import "github.com/open-telemetry/opentelemetry-collector-contri import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" ) type Point[Self any] interface { @@ -52,19 +54,19 @@ func (dp Histogram) CopyTo(dst Histogram) { } type ExpHistogram struct { - pmetric.ExponentialHistogramDataPoint + expo.DataPoint } func (dp ExpHistogram) Clone() ExpHistogram { - clone := ExpHistogram{ExponentialHistogramDataPoint: pmetric.NewExponentialHistogramDataPoint()} - if dp.ExponentialHistogramDataPoint != (pmetric.ExponentialHistogramDataPoint{}) { + clone := ExpHistogram{DataPoint: pmetric.NewExponentialHistogramDataPoint()} + if dp.DataPoint != (expo.DataPoint{}) { dp.CopyTo(clone) } return clone } func (dp ExpHistogram) CopyTo(dst ExpHistogram) { - dp.ExponentialHistogramDataPoint.CopyTo(dst.ExponentialHistogramDataPoint) + dp.DataPoint.CopyTo(dst.DataPoint) } type mustPoint[D Point[D]] struct{ _ D } diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/expo.go b/processor/deltatocumulativeprocessor/internal/data/expo/expo.go new file mode 100644 index 000000000000..2011e3cd811e --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/expo.go @@ -0,0 +1,52 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package expo implements various operations on exponential histograms and their bucket counts +package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + +import "go.opentelemetry.io/collector/pdata/pmetric" + +type ( + DataPoint = pmetric.ExponentialHistogramDataPoint + Buckets = pmetric.ExponentialHistogramDataPointBuckets +) + +// Abs returns a view into the buckets using an absolute scale +func Abs(bs Buckets) Absolute { + return Absolute{buckets: bs} +} + +type buckets = Buckets + +// Absolute addresses bucket counts using an absolute scale, such that it is +// interoperable with [Scale]. +// +// It spans from [[Absolute.Lower]:[Absolute.Upper]] +// +// NOTE: The zero-value is unusable, use [Abs] to construct +type Absolute struct { + buckets +} + +// Abs returns the value at absolute index 'at' +func (a Absolute) Abs(at int) uint64 { + if i, ok := a.idx(at); ok { + return a.BucketCounts().At(i) + } + return 0 +} + +// Upper returns the minimal index outside the set, such that every i < Upper +func (a Absolute) Upper() int { + return a.BucketCounts().Len() + int(a.Offset()) +} + +// Lower returns the minimal index inside the set, such that every i >= Lower +func (a Absolute) Lower() int { + return int(a.Offset()) +} + +func (a Absolute) idx(at int) (int, bool) { + idx := at - a.Lower() + return idx, idx >= 0 && idx < a.BucketCounts().Len() +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/expo_test.go b/processor/deltatocumulativeprocessor/internal/data/expo/expo_test.go new file mode 100644 index 000000000000..d7eb0cb2e9b3 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/expo_test.go @@ -0,0 +1,63 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo_test + +import ( + "fmt" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" +) + +func TestAbsolute(t *testing.T) { + is := expotest.Is(t) + + bs := expotest.Bins{ø, 1, 2, 3, 4, 5, ø, ø}.Into() + abs := expo.Abs(bs) + + lo, up := abs.Lower(), abs.Upper() + is.Equalf(-2, lo, "lower-bound") + is.Equalf(3, up, "upper-bound") + + for i := lo; i < up; i++ { + got := abs.Abs(i) + is.Equal(bs.BucketCounts().At(i+2), got) + } +} + +func ExampleAbsolute() { + nums := []float64{0.4, 2.3, 2.4, 4.5} + + bs := expotest.Observe0(nums...) + abs := expo.Abs(bs) + + s := expo.Scale(0) + for _, n := range nums { + fmt.Printf("%.1f belongs to bucket %+d\n", n, s.Idx(n)) + } + + fmt.Printf("\n index:") + for i := 0; i < bs.BucketCounts().Len(); i++ { + fmt.Printf(" %d", i) + } + fmt.Printf("\n abs:") + for i := abs.Lower(); i < abs.Upper(); i++ { + fmt.Printf(" %+d", i) + } + fmt.Printf("\ncounts:") + for i := abs.Lower(); i < abs.Upper(); i++ { + fmt.Printf(" %d", abs.Abs(i)) + } + + // Output: + // 0.4 belongs to bucket -2 + // 2.3 belongs to bucket +1 + // 2.4 belongs to bucket +1 + // 4.5 belongs to bucket +2 + // + // index: 0 1 2 3 4 + // abs: -2 -1 +0 +1 +2 + // counts: 1 0 0 2 1 +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/expotest/bins.go b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/bins.go new file mode 100644 index 000000000000..13b4ce74c928 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/bins.go @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expotest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" + +import ( + "fmt" + "math" + + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" +) + +const ( + Empty = math.MaxUint64 + ø = Empty +) + +// index: 0 1 2 3 4 5 6 7 +// bucket: -3 -2 -1 0 1 2 3 4 +// bounds: (0.125,0.25], (0.25,0.5], (0.5,1], (1,2], (2,4], (4,8], (8,16], (16,32] +type Bins [8]uint64 + +func (bins Bins) Into() expo.Buckets { + start := 0 + for i := 0; i < len(bins); i++ { + if bins[i] != ø { + start = i + break + } + } + + end := len(bins) + for i := start; i < len(bins); i++ { + if bins[i] == ø { + end = i + break + } + } + + counts := bins[start:end] + + buckets := pmetric.NewExponentialHistogramDataPointBuckets() + buckets.SetOffset(int32(start - 3)) + buckets.BucketCounts().FromRaw(counts) + return buckets +} + +func ObserveInto(bs expo.Buckets, scale expo.Scale, pts ...float64) { + counts := bs.BucketCounts() + + for _, pt := range pts { + pt = math.Abs(pt) + if pt <= 0.125 || pt > 32 { + panic(fmt.Sprintf("out of bounds: 0.125 < %f <= 32", pt)) + } + + idx := scale.Idx(pt) - int(bs.Offset()) + switch { + case idx < 0: + bs.SetOffset(bs.Offset() + int32(idx)) + counts.FromRaw(append(make([]uint64, -idx), counts.AsRaw()...)) + idx = 0 + case idx >= counts.Len(): + counts.Append(make([]uint64, idx-counts.Len()+1)...) + } + + counts.SetAt(idx, counts.At(idx)+1) + } +} + +func Observe(scale expo.Scale, pts ...float64) expo.Buckets { + bs := pmetric.NewExponentialHistogramDataPointBuckets() + ObserveInto(bs, scale, pts...) + return bs +} + +func Observe0(pts ...float64) expo.Buckets { + return Observe(0, pts...) +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal.go b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal.go new file mode 100644 index 000000000000..c34e7c1665bc --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal.go @@ -0,0 +1,115 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expotest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" + +import ( + "reflect" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" +) + +// T is the testing helper. Most notably it provides [T.Equal] +type T struct { + testing.TB +} + +func Is(t testing.TB) T { + return T{TB: t} +} + +// Equal reports whether want and got are deeply equal. +// +// Unlike [reflect.DeepEqual] it first recursively checks exported fields +// and "getters", which are defined as an exported method with: +// - exactly zero input arguments +// - exactly one return value +// - does not start with 'Append' +// +// If this yields differences, those are reported and the test fails. +// If the compared values are [pmetric.ExponentialHistogramDataPoint], then +// [pmetrictest.CompareExponentialHistogramDataPoint] is also called. +// +// If no differences are found, it falls back to [assert.Equal]. +// +// This was done to aid readability when comparing deeply nested [pmetric]/[pcommon] types, +// because in many cases [assert.Equal] output was found to be barely understandable. +func (is T) Equal(want, got any) { + is.Helper() + equal(is.TB, want, got, "") +} + +func (is T) Equalf(want, got any, name string) { + is.Helper() + equal(is.TB, want, got, name) +} + +func equal(t testing.TB, want, got any, name string) bool { + t.Helper() + require.IsType(t, want, got) + + vw := reflect.ValueOf(want) + vg := reflect.ValueOf(got) + + if vw.Kind() != reflect.Struct { + ok := reflect.DeepEqual(want, got) + if !ok { + t.Errorf("%s: %+v != %+v", name, want, got) + } + return ok + } + + ok := true + // compare all "getters" of the struct + for i := 0; i < vw.NumMethod(); i++ { + mname := vw.Type().Method(i).Name + fname := strings.TrimPrefix(name+"."+mname+"()", ".") + + mw := vw.Method(i) + mg := vg.Method(i) + + // only compare "getters" + if mw.Type().NumIn() != 0 || mw.Type().NumOut() != 1 { + continue + } + // Append(Empty) fails above heuristic, exclude it + if strings.HasPrefix(mname, "Append") { + continue + } + + rw := mw.Call(nil)[0].Interface() + rg := mg.Call(nil)[0].Interface() + + ok = equal(t, rw, rg, fname) && ok + } + + // compare all exported fields of the struct + for i := 0; i < vw.NumField(); i++ { + if !vw.Type().Field(i).IsExported() { + continue + } + fname := name + "." + vw.Type().Field(i).Name + fw := vw.Field(i).Interface() + fg := vg.Field(i).Interface() + ok = equal(t, fw, fg, fname) && ok + } + if !ok { + return false + } + + if _, ok := want.(expo.DataPoint); ok { + err := pmetrictest.CompareExponentialHistogramDataPoint(want.(expo.DataPoint), got.(expo.DataPoint)) + if err != nil { + t.Error(err) + } + } + + // fallback to a full deep-equal for rare cases (unexported fields, etc) + return assert.Equal(t, want, got) +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal_test.go b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal_test.go new file mode 100644 index 000000000000..7fb7c42b586e --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal_test.go @@ -0,0 +1,73 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expotest + +import ( + "fmt" + "path/filepath" + "runtime" + "strconv" + "strings" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" +) + +var t testing.TB = fakeT{} + +var expotest = struct { + Is func(t testing.TB) T + Observe func(expo.Scale, ...float64) expo.Buckets +}{ + Is: Is, + Observe: Observe, +} + +func ExampleT_Equal() { + is := expotest.Is(t) + + want := Histogram{ + PosNeg: expotest.Observe(expo.Scale(0), 1, 2, 3, 4), + Scale: 0, + }.Into() + + got := Histogram{ + PosNeg: expotest.Observe(expo.Scale(1), 1, 1, 1, 1), + Scale: 1, + }.Into() + + is.Equal(want, got) + + // Output: + // equal_test.go:40: Negative().BucketCounts().AsRaw(): [1 1 2] != [4] + // equal_test.go:40: Negative().BucketCounts().Len(): 3 != 1 + // equal_test.go:40: Positive().BucketCounts().AsRaw(): [1 1 2] != [4] + // equal_test.go:40: Positive().BucketCounts().Len(): 3 != 1 + // equal_test.go:40: Scale(): 0 != 1 +} + +func TestNone(*testing.T) {} + +type fakeT struct { + testing.TB +} + +func (t fakeT) Helper() {} + +func (t fakeT) Errorf(format string, args ...any) { + var from string + for i := 0; ; i++ { + pc, file, line, ok := runtime.Caller(i) + if !ok { + break + } + fn := runtime.FuncForPC(pc) + if strings.HasSuffix(fn.Name(), ".ExampleT_Equal") { + from = filepath.Base(file) + ":" + strconv.Itoa(line) + break + } + } + + fmt.Printf("%s: %s\n", from, fmt.Sprintf(format, args...)) +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/expotest/histogram.go b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/histogram.go new file mode 100644 index 000000000000..141dad724d82 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/histogram.go @@ -0,0 +1,65 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expotest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" +) + +type Histogram struct { + Ts pcommon.Timestamp + + Pos, Neg expo.Buckets + PosNeg expo.Buckets + + Scale int + Count uint64 + Sum *float64 + + Min, Max *float64 + + Zt float64 + Zc uint64 +} + +func (hist Histogram) Into() expo.DataPoint { + dp := pmetric.NewExponentialHistogramDataPoint() + dp.SetTimestamp(hist.Ts) + + if !zero(hist.PosNeg) { + hist.PosNeg.CopyTo(dp.Positive()) + hist.PosNeg.CopyTo(dp.Negative()) + } + + if !zero(hist.Pos) { + hist.Pos.MoveTo(dp.Positive()) + } + if !zero(hist.Neg) { + hist.Neg.MoveTo(dp.Negative()) + } + + dp.SetCount(hist.Count) + if hist.Sum != nil { + dp.SetSum(*hist.Sum) + } + + if hist.Min != nil { + dp.SetMin(*hist.Min) + } + if hist.Max != nil { + dp.SetMax(*hist.Max) + } + + dp.SetScale(int32(hist.Scale)) + dp.SetZeroThreshold(hist.Zt) + dp.SetZeroCount(hist.Zc) + return dp +} + +func zero[T comparable](v T) bool { + return v == *new(T) +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/merge.go b/processor/deltatocumulativeprocessor/internal/data/expo/merge.go new file mode 100644 index 000000000000..150e29a65819 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/merge.go @@ -0,0 +1,37 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" +) + +// Merge combines the counts of buckets a and b into a. +// Both buckets MUST be of same scale +func Merge(arel, brel Buckets) { + if brel.BucketCounts().Len() == 0 { + return + } + if arel.BucketCounts().Len() == 0 { + brel.CopyTo(arel) + return + } + + a, b := Abs(arel), Abs(brel) + + lo := min(a.Lower(), b.Lower()) + up := max(a.Upper(), b.Upper()) + + size := up - lo + + counts := pcommon.NewUInt64Slice() + counts.Append(make([]uint64, size-counts.Len())...) + + for i := 0; i < counts.Len(); i++ { + counts.SetAt(i, a.Abs(lo+i)+b.Abs(lo+i)) + } + + a.SetOffset(int32(lo)) + counts.MoveTo(a.BucketCounts()) +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/merge_test.go b/processor/deltatocumulativeprocessor/internal/data/expo/merge_test.go new file mode 100644 index 000000000000..4d3791721bcd --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/merge_test.go @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo_test + +import ( + "fmt" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" +) + +const ø = expotest.Empty + +type bins = expotest.Bins + +func TestMerge(t *testing.T) { + cases := []struct { + a, b bins + want bins + }{{ + // -3 -2 -1 0 1 2 3 4 + a: bins{ø, ø, ø, ø, ø, ø, ø, ø}, + b: bins{ø, ø, ø, ø, ø, ø, ø, ø}, + want: bins{ø, ø, ø, ø, ø, ø, ø, ø}, + }, { + a: bins{ø, ø, 1, 1, 1, ø, ø, ø}, + b: bins{ø, 1, 1, ø, ø, ø, ø, ø}, + want: bins{ø, 1, 2, 1, 1, ø, ø, ø}, + }, { + a: bins{ø, ø, ø, ø, 1, 1, 1, ø}, + b: bins{ø, ø, ø, ø, 1, 1, 1, ø}, + want: bins{ø, ø, ø, ø, 2, 2, 2, ø}, + }, { + a: bins{ø, 1, 1, ø, ø, ø, ø, ø}, + b: bins{ø, ø, ø, ø, 1, 1, ø, ø}, + want: bins{ø, 1, 1, 0, 1, 1, ø, ø}, + }} + + for _, cs := range cases { + a := cs.a.Into() + b := cs.b.Into() + want := cs.want.Into() + + name := fmt.Sprintf("(%+d,%d)+(%+d,%d)=(%+d,%d)", a.Offset(), a.BucketCounts().Len(), b.Offset(), b.BucketCounts().Len(), want.Offset(), want.BucketCounts().Len()) + t.Run(name, func(t *testing.T) { + expo.Merge(a, b) + is := expotest.Is(t) + is.Equal(want, a) + }) + } +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/ord.go b/processor/deltatocumulativeprocessor/internal/data/expo/ord.go new file mode 100644 index 000000000000..34d177be1795 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/ord.go @@ -0,0 +1,16 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + +import "cmp" + +// HiLo returns the greater of a and b by comparing the result of applying fn to +// each. If equal, returns operands as passed +func HiLo[T any, N cmp.Ordered](a, b T, fn func(T) N) (hi, lo T) { + an, bn := fn(a), fn(b) + if cmp.Less(an, bn) { + return b, a + } + return a, b +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/ord_test.go b/processor/deltatocumulativeprocessor/internal/data/expo/ord_test.go new file mode 100644 index 000000000000..dedc60b50f27 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/ord_test.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" +) + +func TestHiLo(t *testing.T) { + type T struct { + int int + str string + } + + a := T{int: 0, str: "foo"} + b := T{int: 1, str: "bar"} + + { + hi, lo := expo.HiLo(a, b, func(v T) int { return v.int }) + assert.Equal(t, a, lo) + assert.Equal(t, b, hi) + } + + { + hi, lo := expo.HiLo(a, b, func(v T) string { return v.str }) + assert.Equal(t, b, lo) + assert.Equal(t, a, hi) + } + + { + hi, lo := expo.HiLo(a, b, func(T) int { return 0 }) + assert.Equal(t, a, hi) + assert.Equal(t, b, lo) + } +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/scale.go b/processor/deltatocumulativeprocessor/internal/data/expo/scale.go new file mode 100644 index 000000000000..ac075158dc3c --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/scale.go @@ -0,0 +1,115 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + +import ( + "fmt" + "math" +) + +type Scale int32 + +// Idx gives the bucket index v belongs into +func (scale Scale) Idx(v float64) int { + // from: https://opentelemetry.io/docs/specs/otel/metrics/data-model/#all-scales-use-the-logarithm-function + + // Special case for power-of-two values. + if frac, exp := math.Frexp(v); frac == 0.5 { + return ((exp - 1) << scale) - 1 + } + + scaleFactor := math.Ldexp(math.Log2E, int(scale)) + // Note: math.Floor(value) equals math.Ceil(value)-1 when value + // is not a power of two, which is checked above. + return int(math.Floor(math.Log(v) * scaleFactor)) +} + +// Bounds returns the half-open interval (min,max] of the bucket at index. +// This means a value min < v <= max belongs to this bucket. +// +// NOTE: this is different from Go slice intervals, which are [a,b) +func (scale Scale) Bounds(index int) (min, max float64) { + // from: https://opentelemetry.io/docs/specs/otel/metrics/data-model/#all-scales-use-the-logarithm-function + lower := func(index int) float64 { + inverseFactor := math.Ldexp(math.Ln2, int(-scale)) + return math.Exp(float64(index) * inverseFactor) + } + + return lower(index), lower(index + 1) +} + +// Downscale collapses the buckets of bs until scale 'to' is reached +func Downscale(bs Buckets, from, to Scale) { + switch { + case from == to: + return + case from < to: + // because even distribution within the buckets cannot be assumed, it is + // not possible to correctly upscale (split) buckets. + // any attempt to do so would yield erronous data. + panic(fmt.Sprintf("cannot upscale without introducing error (%d -> %d)", from, to)) + } + + for at := from; at > to; at-- { + Collapse(bs) + } +} + +// Collapse merges adjacent buckets and zeros the remaining area: +// +// before: 1 1 1 1 1 1 1 1 1 1 1 1 +// after: 2 2 2 2 2 2 0 0 0 0 0 0 +// +// Due to the "perfect subsetting" property of exponential histograms, this +// gives the same observation as before, but recorded at scale-1. See +// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponential-scale. +// +// Because every bucket now spans twice as much range, half of the allocated +// counts slice is technically no longer required. It is zeroed but left in +// place to avoid future allocations, because observations may happen in that +// area at a later time. +func Collapse(bs Buckets) { + counts := bs.BucketCounts() + size := counts.Len() / 2 + if counts.Len()%2 != 0 { + size++ + } + + // merging needs to happen in pairs aligned to i=0. if offset is non-even, + // we need to shift the whole merging by one to make above condition true. + shift := 0 + if bs.Offset()%2 != 0 { + bs.SetOffset(bs.Offset() - 1) + shift-- + } + bs.SetOffset(bs.Offset() / 2) + + for i := 0; i < size; i++ { + // size is ~half of len. we add two buckets per iteration. + // k jumps in steps of 2, shifted if offset makes this necessary. + k := i*2 + shift + + // special case: we just started and had to shift. the left half of the + // new bucket is not actually stored, so only use counts[0]. + if i == 0 && k == -1 { + counts.SetAt(i, counts.At(k+1)) + continue + } + + // new[k] = old[k]+old[k+1] + counts.SetAt(i, counts.At(k)) + if k+1 < counts.Len() { + counts.SetAt(i, counts.At(k)+counts.At(k+1)) + } + } + + // zero the excess area. its not needed to represent the observation + // anymore, but kept for two reasons: + // 1. future observations may need it, no need to re-alloc then if kept + // 2. [pcommon.Uint64Slice] can not, in fact, be sliced, so getting rid + // of it would alloc ¯\_(ツ)_/¯ + for i := size; i < counts.Len(); i++ { + counts.SetAt(i, 0) + } +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/scale_test.go b/processor/deltatocumulativeprocessor/internal/data/expo/scale_test.go new file mode 100644 index 000000000000..ceb76eb1d44d --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/scale_test.go @@ -0,0 +1,90 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo_test + +import ( + "fmt" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" +) + +func TestDownscale(t *testing.T) { + type Repr[T any] struct { + scale expo.Scale + bkt T + } + + cases := [][]Repr[string]{{ + {scale: 2, bkt: "1 1 1 1 1 1 1 1 1 1 1 1"}, + {scale: 1, bkt: " 2 2 2 2 2 2 "}, + {scale: 0, bkt: " 4 4 4 "}, + }, { + {scale: 2, bkt: "ø 1 1 1 1 1 1 1 1 1 1 1"}, + {scale: 1, bkt: " 1 2 2 2 2 2 "}, + {scale: 0, bkt: " 3 4 4 "}, + }, { + {scale: 2, bkt: "ø ø 1 1 1 1 1 1 1 1 1 1"}, + {scale: 1, bkt: " ø 2 2 2 2 2 "}, + {scale: 0, bkt: " 2 4 4 "}, + }, { + {scale: 2, bkt: "ø ø ø ø 1 1 1 1 1 1 1 1"}, + {scale: 1, bkt: " ø ø 2 2 2 2 "}, + {scale: 0, bkt: " ø 4 4 "}, + }, { + {scale: 2, bkt: "1 1 1 1 1 1 1 1 1 "}, + {scale: 1, bkt: " 2 2 2 2 1 "}, + {scale: 0, bkt: " 4 4 1 "}, + }, { + {scale: 2, bkt: "1 1 1 1 1 1 1 1 1 1 1 1"}, + {scale: 0, bkt: " 4 4 4 "}, + }} + + type B = expo.Buckets + for i, reprs := range cases { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + buckets := make([]Repr[B], len(reprs)) + for i, r := range reprs { + bkt := pmetric.NewExponentialHistogramDataPointBuckets() + for _, elem := range strings.Fields(r.bkt) { + if elem == "ø" { + bkt.SetOffset(bkt.Offset() + 1) + continue + } + n, err := strconv.Atoi(elem) + if err != nil { + panic(err) + } + bkt.BucketCounts().Append(uint64(n)) + } + buckets[i] = Repr[B]{scale: r.scale, bkt: bkt} + } + + is := expotest.Is(t) + for i := 0; i < len(buckets)-1; i++ { + expo.Downscale(buckets[i].bkt, buckets[i].scale, buckets[i+1].scale) + + is.Equalf(buckets[i+1].bkt.Offset(), buckets[i].bkt.Offset(), "offset") + + want := buckets[i+1].bkt.BucketCounts().AsRaw() + got := buckets[i].bkt.BucketCounts().AsRaw() + + is.Equalf(want, got[:len(want)], "counts") + is.Equalf(make([]uint64, len(got)-len(want)), got[len(want):], "extra-space") + } + }) + } + + t.Run("panics", func(t *testing.T) { + assert.PanicsWithValue(t, "cannot upscale without introducing error (8 -> 12)", func() { + expo.Downscale(bins{}.Into(), 8, 12) + }) + }) +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/zero.go b/processor/deltatocumulativeprocessor/internal/data/expo/zero.go new file mode 100644 index 000000000000..2d5401b39f5c --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/zero.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + +import ( + "cmp" + "fmt" +) + +// WidenZero widens the zero-bucket to span at least [-width,width], possibly wider +// if min falls in the middle of a bucket. +// +// Both buckets counts MUST be of same scale. +func WidenZero(dp DataPoint, width float64) { + switch { + case width == dp.ZeroThreshold(): + return + case width < dp.ZeroThreshold(): + panic(fmt.Sprintf("min must be larger than current threshold (%f)", dp.ZeroThreshold())) + } + + scale := Scale(dp.Scale()) + zero := scale.Idx(width) // the largest bucket index inside the zero width + + widen := func(bs Buckets) { + abs := Abs(bs) + for i := abs.Lower(); i <= zero; i++ { + dp.SetZeroCount(dp.ZeroCount() + abs.Abs(i)) + } + + // right next to the new zero bucket, constrained to slice range + lo := clamp(zero+1, abs.Lower(), abs.Upper()) + abs.Slice(lo, abs.Upper()) + } + + widen(dp.Positive()) + widen(dp.Negative()) + + _, max := scale.Bounds(zero) + dp.SetZeroThreshold(max) +} + +// Slice drops data outside the range from <= i < to from the bucket counts. It behaves the same as Go's [a:b] +// +// Limitations: +// - due to a limitation of the pcommon package, slicing cannot happen in-place and allocates +// - in consequence, data outside the range is garbage collected +func (a Absolute) Slice(from, to int) { + lo, up := a.Lower(), a.Upper() + switch { + case from > to: + panic(fmt.Sprintf("bad bounds: must be from<=to (got %d<=%d)", from, to)) + case from < lo || to > up: + panic(fmt.Sprintf("%d:%d is out of bounds for %d:%d", from, to, lo, up)) + } + + first := from - lo + last := to - lo + + a.BucketCounts().FromRaw(a.BucketCounts().AsRaw()[first:last]) + a.SetOffset(int32(from)) +} + +// clamp constraints v to the range up..=lo +func clamp[N cmp.Ordered](v, lo, up N) N { + return max(lo, min(v, up)) +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/zero_test.go b/processor/deltatocumulativeprocessor/internal/data/expo/zero_test.go new file mode 100644 index 000000000000..92e9d88a38d1 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/zero_test.go @@ -0,0 +1,125 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo_test + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" +) + +type hist = expotest.Histogram + +func TestWidenZero(t *testing.T) { + cases := []struct { + name string + hist hist + want hist + min float64 + }{{ + // -3 -2 -1 0 1 2 3 4 + // (0.125,0.25], (0.25,0.5], (0.5,1], (1,2], (2,4], (4,8], (8,16], (16,32] + // + // -3 -2 -1 0 1 2 3 4 + hist: hist{PosNeg: bins{ø, ø, ø, ø, ø, ø, ø, ø}.Into(), Zt: 0, Zc: 0}, + want: hist{PosNeg: bins{ø, ø, ø, ø, ø, ø, ø, ø}.Into(), Zt: 0, Zc: 0}, + }, { + // zt=2 is upper boundary of bucket 0. keep buckets [1:n] + hist: hist{PosNeg: bins{ø, ø, 1, 2, 3, 4, 5, ø}.Into(), Zt: 0, Zc: 2}, + want: hist{PosNeg: bins{ø, ø, ø, ø, 3, 4, 5, ø}.Into(), Zt: 2, Zc: 2 + 2*(1+2)}, + }, { + // zt=3 is within bucket 1. keep buckets [2:n] + // set zt=4 because it must cover full buckets + hist: hist{PosNeg: bins{ø, ø, 1, 2, 3, 4, 5, ø}.Into(), Zt: 0, Zc: 2}, + min: 3, + want: hist{PosNeg: bins{ø, ø, ø, ø, ø, 4, 5, ø}.Into(), Zt: 4, Zc: 2 + 2*(1+2+3)}, + }, { + // zt=2 is higher, but no change expected as no buckets in this range are populated + hist: hist{PosNeg: bins{ø, ø, ø, ø, ø, ø, 1, 1}.Into(), Zt: 1.0, Zc: 2}, + want: hist{PosNeg: bins{ø, ø, ø, ø, ø, ø, 1, 1}.Into(), Zt: 2.0, Zc: 2}, + }} + + for _, cs := range cases { + name := fmt.Sprintf("%.2f->%.2f", cs.hist.Zt, cs.want.Zt) + t.Run(name, func(t *testing.T) { + hist := cs.hist.Into() + want := cs.want.Into() + + zt := cs.min + if zt == 0 { + zt = want.ZeroThreshold() + } + expo.WidenZero(hist, zt) + + is := expotest.Is(t) + is.Equal(want, hist) + }) + } + + t.Run("panics", func(t *testing.T) { + assert.PanicsWithValue(t, "min must be larger than current threshold (1.500000)", func() { + hist := hist{Zt: 1.5}.Into() + expo.WidenZero(hist, 0.5) + }) + }) +} + +func TestSlice(t *testing.T) { + cases := []struct { + bins bins + want bins + }{{ + // -3 -2 -1 0 1 2 3 4 + bins: bins{ø, ø, ø, ø, ø, ø, ø, ø}, + want: bins{ø, ø, ø, ø, ø, ø, ø, ø}, + }, { + bins: bins{1, 2, 3, 4, 5, 6, 7, 8}, + want: bins{1, 2, 3, 4, 5, 6, 7, 8}, + }, { + bins: bins{ø, 2, 3, 4, 5, 6, 7, ø}, + want: bins{ø, ø, 3, 4, 5, ø, ø, ø}, + }} + + for _, cs := range cases { + from, to := 0, len(cs.want) + for i := 0; i < len(cs.want); i++ { + if cs.want[i] != ø { + from += i + break + } + } + for i := from; i < len(cs.want); i++ { + if cs.want[i] == ø { + to = i + break + } + } + from -= 3 + to -= 3 + + t.Run(fmt.Sprintf("[%d:%d]", from, to), func(t *testing.T) { + bins := cs.bins.Into() + want := cs.want.Into() + + expo.Abs(bins).Slice(from, to) + + is := expotest.Is(t) + is.Equal(want, bins) + }) + } + + t.Run("panics", func(t *testing.T) { + data := expo.Abs(bins{1, 2, 3, 4, 5, 6, 7, 8}.Into()) + assert.PanicsWithValue(t, "bad bounds: must be from<=to (got 8<=4)", func() { + data.Slice(8, 4) + }) + assert.PanicsWithValue(t, "-6:12 is out of bounds for -3:5", func() { + data.Slice(-6, 12) + }) + }) +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo_test.go b/processor/deltatocumulativeprocessor/internal/data/expo_test.go new file mode 100644 index 000000000000..b910b409cb55 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo_test.go @@ -0,0 +1,131 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package data + +import ( + "math" + "testing" + + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" +) + +// represents none/absent/unset in several tests +const ø = math.MaxUint64 + +func TestAdd(t *testing.T) { + type expdp = expotest.Histogram + type bins = expotest.Bins + var obs0 = expotest.Observe0 + + cases := []struct { + name string + dp, in expdp + want expdp + flip bool + }{{ + name: "noop", + dp: expdp{PosNeg: bins{0, 0, 0, 0, 0, 0, 0, 0}.Into(), Count: 0}, + in: expdp{PosNeg: bins{0, 0, 0, 0, 0, 0, 0, 0}.Into(), Count: 0}, + want: expdp{PosNeg: bins{0, 0, 0, 0, 0, 0, 0, 0}.Into(), Count: 0}, + }, { + name: "simple", + dp: expdp{PosNeg: bins{0, 0, 0, 0, 0, 0, 0, 0}.Into(), Count: 0}, + in: expdp{PosNeg: bins{1, 2, 3, 4, 5, 6, 7, 8}.Into(), Count: 2 * (1 + 2 + 3 + 4 + 5 + 6 + 7 + 8)}, + want: expdp{PosNeg: bins{1, 2, 3, 4, 5, 6, 7, 8}.Into(), Count: 2 * (0 + (1 + 2 + 3 + 4 + 5 + 6 + 7 + 8))}, + }, { + name: "lower+shorter", + dp: expdp{PosNeg: bins{ø, ø, ø, ø, ø, 1, 1, 1}.Into(), Count: 2 * 3}, + in: expdp{PosNeg: bins{ø, ø, 1, 1, 1, 1, 1, ø}.Into(), Count: 2 * 5}, + want: expdp{PosNeg: bins{ø, ø, 1, 1, 1, 2, 2, 1}.Into(), Count: 2 * (3 + 5)}, + }, { + name: "longer", + dp: expdp{PosNeg: bins{1, 1, 1, 1, 1, ø, ø, ø}.Into(), Count: 2 * 5}, + in: expdp{PosNeg: bins{1, 1, 1, 1, 1, 1, 1, 1}.Into(), Count: 2 * 8}, + want: expdp{PosNeg: bins{2, 2, 2, 2, 2, 1, 1, 1}.Into(), Count: 2 * (5 + 8)}, + }, { + name: "optional/missing", flip: true, + dp: expdp{PosNeg: obs0(0.6, 2.4) /* */, Count: 2}, + in: expdp{PosNeg: obs0(1.5, 3.2, 6.3), Min: some(1.5), Max: some(6.3), Sum: some(11.0), Count: 3}, + want: expdp{PosNeg: obs0(0.6, 2.4, 1.5, 3.2, 6.3) /* */, Count: 5}, + }, { + name: "optional/min-max-sum", + dp: expdp{PosNeg: obs0(1.5, 5.3, 11.6) /* */, Min: some(1.5), Max: some(11.6), Sum: some(18.4), Count: 3}, + in: expdp{PosNeg: obs0(0.6, 3.3, 7.9) /* */, Min: some(0.6), Max: some(07.9), Sum: some(11.8), Count: 3}, + want: expdp{PosNeg: obs0(1.5, 5.3, 11.6, 0.6, 3.3, 7.9), Min: some(0.6), Max: some(11.6), Sum: some(30.2), Count: 6}, + }, { + name: "zero/count", + dp: expdp{PosNeg: bins{0, 1, 2}.Into(), Zt: 0, Zc: 3, Count: 5}, + in: expdp{PosNeg: bins{0, 1, 0}.Into(), Zt: 0, Zc: 2, Count: 3}, + want: expdp{PosNeg: bins{0, 2, 2}.Into(), Zt: 0, Zc: 5, Count: 8}, + }, { + name: "zero/diff", + dp: expdp{PosNeg: bins{ø, ø, 0, 1, 1, 1}.Into(), Zt: 0.0, Zc: 2}, + in: expdp{PosNeg: bins{ø, ø, ø, ø, 1, 1}.Into(), Zt: 2.0, Zc: 2}, + want: expdp{PosNeg: bins{ø, ø, ø, ø, 2, 2}.Into(), Zt: 2.0, Zc: 4 + 2*1}, + }, { + name: "zero/subzero", + dp: expdp{PosNeg: bins{ø, 1, 1, 1, 1, 1}.Into(), Zt: 0.2, Zc: 2}, + in: expdp{PosNeg: bins{ø, ø, 1, 1, 1, 1}.Into(), Zt: 0.3, Zc: 2}, + want: expdp{PosNeg: bins{ø, ø, 2, 2, 2, 2}.Into(), Zt: 0.5, Zc: 4 + 2*1}, + }, { + name: "negative-offset", + dp: expdp{PosNeg: rawbs([]uint64{ /* */ 1, 2}, -2)}, + in: expdp{PosNeg: rawbs([]uint64{1, 2, 3 /* */}, -5)}, + want: expdp{PosNeg: rawbs([]uint64{1, 2, 3, 1, 2}, -5)}, + }, { + name: "scale/diff", + dp: expdp{PosNeg: expotest.Observe(expo.Scale(1), 1, 2, 3, 4), Scale: 1}, + in: expdp{PosNeg: expotest.Observe(expo.Scale(0), 4, 3, 2, 1), Scale: 0}, + want: expdp{Scale: 0, PosNeg: func() expo.Buckets { + bs := pmetric.NewExponentialHistogramDataPointBuckets() + expotest.ObserveInto(bs, expo.Scale(0), 1, 2, 3, 4) + expotest.ObserveInto(bs, expo.Scale(0), 4, 3, 2, 1) + bs.BucketCounts().Append([]uint64{0, 0}...) // rescaling leaves zeroed memory. this is expected + return bs + }()}, + }} + + for _, cs := range cases { + run := func(dp, in expdp) func(t *testing.T) { + return func(t *testing.T) { + is := expotest.Is(t) + + var ( + dp = ExpHistogram{dp.Into()} + in = ExpHistogram{in.Into()} + want = ExpHistogram{cs.want.Into()} + ) + + dp.SetTimestamp(0) + in.SetTimestamp(1) + want.SetTimestamp(1) + + got := dp.Add(in) + is.Equal(want.DataPoint, got.DataPoint) + } + } + + if cs.flip { + t.Run(cs.name+"-dp", run(cs.dp, cs.in)) + t.Run(cs.name+"-in", run(cs.in, cs.dp)) + continue + } + t.Run(cs.name, run(cs.dp, cs.in)) + } + +} + +func rawbs(data []uint64, offset int32) expo.Buckets { + bs := pmetric.NewExponentialHistogramDataPointBuckets() + bs.BucketCounts().FromRaw(data) + bs.SetOffset(offset) + return bs +} + +func some[T any](v T) *T { + return &v +} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/data.go b/processor/deltatocumulativeprocessor/internal/metrics/data.go index c305c85d781e..f063475055f7 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/data.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/data.go @@ -47,7 +47,7 @@ type ExpHistogram Metric func (s ExpHistogram) At(i int) data.ExpHistogram { dp := Metric(s).ExponentialHistogram().DataPoints().At(i) - return data.ExpHistogram{ExponentialHistogramDataPoint: dp} + return data.ExpHistogram{DataPoint: dp} } func (s ExpHistogram) Len() int { diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go index f3b88ef8b96a..946ffd98d1d6 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go @@ -19,11 +19,14 @@ import ( type Telemetry struct { Metrics + + meter metric.Meter } func New(meter metric.Meter) Telemetry { return Telemetry{ Metrics: metrics(meter), + meter: meter, } } @@ -89,23 +92,23 @@ func metrics(meter metric.Meter) Metrics { } } -func (m Metrics) WithLimit(meter metric.Meter, max int64) { +func (tel Telemetry) WithLimit(max int64) { then := metric.Callback(func(_ context.Context, o metric.Observer) error { - o.ObserveInt64(m.streams.limit, max) + o.ObserveInt64(tel.streams.limit, max) return nil }) - _, err := meter.RegisterCallback(then, m.streams.limit) + _, err := tel.meter.RegisterCallback(then, tel.streams.limit) if err != nil { panic(err) } } -func (m Metrics) WithStale(meter metric.Meter, max time.Duration) { +func (tel Telemetry) WithStale(max time.Duration) { then := metric.Callback(func(_ context.Context, o metric.Observer) error { - o.ObserveInt64(m.streams.stale, int64(max.Seconds())) + o.ObserveInt64(tel.streams.stale, int64(max.Seconds())) return nil }) - _, err := meter.RegisterCallback(then, m.streams.stale) + _, err := tel.meter.RegisterCallback(then, tel.streams.stale) if err != nil { panic(err) } diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index 59fe2c7c4c0c..01e1cef4f916 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -34,8 +34,8 @@ type Processor struct { ctx context.Context cancel context.CancelFunc - aggr streams.Aggregator[data.Number] - stale maybe.Ptr[staleness.Staleness[data.Number]] + sums Pipeline[data.Number] + expo Pipeline[data.ExpHistogram] mtx sync.Mutex } @@ -43,29 +43,43 @@ type Processor struct { func newProcessor(cfg *Config, log *zap.Logger, meter metric.Meter, next consumer.Metrics) *Processor { ctx, cancel := context.WithCancel(context.Background()) + tel := telemetry.New(meter) + proc := Processor{ log: log, ctx: ctx, cancel: cancel, next: next, + + sums: pipeline[data.Number](cfg, &tel), + expo: pipeline[data.ExpHistogram](cfg, &tel), } - tel := telemetry.New(meter) + return &proc +} + +type Pipeline[D data.Point[D]] struct { + aggr streams.Aggregator[D] + stale maybe.Ptr[staleness.Staleness[D]] +} + +func pipeline[D data.Point[D]](cfg *Config, tel *telemetry.Telemetry) Pipeline[D] { + var pipe Pipeline[D] - var dps streams.Map[data.Number] - dps = delta.New[data.Number]() + var dps streams.Map[D] + dps = delta.New[D]() dps = telemetry.ObserveItems(dps, &tel.Metrics) if cfg.MaxStale > 0 { - tel.WithStale(meter, cfg.MaxStale) + tel.WithStale(cfg.MaxStale) stale := maybe.Some(staleness.NewStaleness(cfg.MaxStale, dps)) - proc.stale = stale + pipe.stale = stale dps, _ = stale.Try() } if cfg.MaxStreams > 0 { - tel.WithLimit(meter, int64(cfg.MaxStreams)) + tel.WithLimit(int64(cfg.MaxStreams)) lim := streams.Limit(dps, cfg.MaxStreams) - if stale, ok := proc.stale.Try(); ok { + if stale, ok := pipe.stale.Try(); ok { lim.Evictor = stale } dps = lim @@ -73,13 +87,14 @@ func newProcessor(cfg *Config, log *zap.Logger, meter metric.Meter, next consume dps = telemetry.ObserveNonFatal(dps, &tel.Metrics) - proc.aggr = streams.IntoAggregator(dps) - return &proc + pipe.aggr = streams.IntoAggregator(dps) + return pipe } func (p *Processor) Start(_ context.Context, _ component.Host) error { - stale, ok := p.stale.Try() - if !ok { + sums, sok := p.sums.stale.Try() + expo, eok := p.expo.stale.Try() + if !(sok && eok) { return nil } @@ -91,7 +106,8 @@ func (p *Processor) Start(_ context.Context, _ component.Host) error { return case <-tick.C: p.mtx.Lock() - stale.ExpireOldEntries() + sums.ExpireOldEntries() + expo.ExpireOldEntries() p.mtx.Unlock() } } @@ -109,27 +125,34 @@ func (p *Processor) Capabilities() consumer.Capabilities { } func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + if err := context.Cause(p.ctx); err != nil { + return err + } + p.mtx.Lock() defer p.mtx.Unlock() var errs error - metrics.Each(md, func(m metrics.Metric) { switch m.Type() { case pmetric.MetricTypeSum: sum := m.Sum() if sum.AggregationTemporality() == pmetric.AggregationTemporalityDelta { - err := streams.Aggregate[data.Number](metrics.Sum(m), p.aggr) + err := streams.Aggregate(metrics.Sum(m), p.sums.aggr) errs = errors.Join(errs, err) sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) } case pmetric.MetricTypeHistogram: // TODO case pmetric.MetricTypeExponentialHistogram: - // TODO + expo := m.ExponentialHistogram() + if expo.AggregationTemporality() == pmetric.AggregationTemporalityDelta { + err := streams.Aggregate(metrics.ExpHistogram(m), p.expo.aggr) + errs = errors.Join(errs, err) + expo.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + } } }) - if errs != nil { return errs }