Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine precomputed values of filtered attribute sets #3549

Merged
merged 29 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
948932a
Combine spatially aggregated precomputed vals
MrAlias Dec 19, 2022
5896179
Ignore false positive lint error and test method
MrAlias Dec 19, 2022
222a1cb
Add fix to changelog
MrAlias Dec 19, 2022
b463070
Handle edge case of exact set after filter
MrAlias Dec 20, 2022
e219ee2
Fix filter and measure algo for precomp
MrAlias Dec 21, 2022
cd34714
Add tests for precomp sums
MrAlias Dec 21, 2022
954145f
Unify precomputedMap
MrAlias Dec 21, 2022
c483c19
Merge branch 'main' into fix-3439
MrAlias Dec 24, 2022
3f3180f
Merge branch 'main' into fix-3439
MrAlias Jan 3, 2023
da9e4b4
Merge branch 'main' into fix-3439
MrAlias Jan 3, 2023
d5d63f1
Merge branch 'main' into fix-3439
MrAlias Jan 4, 2023
e0177a7
Merge branch 'main' into fix-3439
hanyuancheung Jan 5, 2023
876827b
Adds example from supplimental guide
MadVikingGod Jan 5, 2023
79e0f4a
Fixes for lint
MadVikingGod Jan 5, 2023
52c340f
Merge branch 'main' into fix-3439
MrAlias Jan 10, 2023
813cb0d
Merge branch 'main' into fix-3439
MrAlias Jan 10, 2023
81043bd
Merge branch 'main' into fix-3439
MrAlias Jan 10, 2023
135bd8a
Merge branch 'main' into fix-3439
MrAlias Jan 12, 2023
649337a
Merge branch 'main' into fix-3439
MrAlias Jan 13, 2023
406a4bf
Update sdk/metric/meter_example_test.go
MrAlias Jan 13, 2023
36bf77b
Merge pull request #721 from MadVikingGod/mvg/example-3439
MrAlias Jan 13, 2023
657c069
Fix async example test
MrAlias Jan 13, 2023
f295494
Reduce duplicate code in TestAsynchronousExample
MrAlias Jan 13, 2023
38cd7f4
Clarify naming and documentation
MrAlias Jan 14, 2023
8ec850a
Fix spelling errors
MrAlias Jan 14, 2023
44d1b97
Merge branch 'main' into fix-3439
MadVikingGod Jan 17, 2023
af45187
Add a noop filter to default view
MrAlias Jan 17, 2023
c6b7069
Merge branch 'main' into fix-3439
MrAlias Jan 19, 2023
1c25a20
Merge branch 'main' into fix-3439
MrAlias Jan 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
This `Registration` can be used to unregister callbacks. (#3522)
- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524)

### Fixed

- Asynchronous instruments that use sum aggregators and attribute filters correctly add values from equivalent attribute sets that have been filtered. (#3439, #3549)

### Changed

- The `InstrumentProvider` from `go.opentelemetry.io/otel/sdk/metric/asyncint64` is removed.
Expand Down
67 changes: 61 additions & 6 deletions sdk/metric/internal/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,24 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

type filterAgg[N int64 | float64] interface {
Aggregator[N]

// filtered records values for attributes that have been filtered.
filtered(N, attribute.Set)
}

// NewFilter wraps an Aggregator with an attribute filtering function.
func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggregator[N] {
if fn == nil {
return agg
}
if fa, ok := agg.(filterAgg[N]); ok {
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
return newPrecomputedFilter(fa, fn)
}
return newFilter(agg, fn)
}

// filter is an aggregator that applies attribute filter when Aggregating. filters
// do not have any backing memory, and must be constructed with a backing Aggregator.
type filter[N int64 | float64] struct {
Expand All @@ -31,15 +49,11 @@ type filter[N int64 | float64] struct {
seen map[attribute.Set]attribute.Set
}

// NewFilter wraps an Aggregator with an attribute filtering function.
func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggregator[N] {
if fn == nil {
return agg
}
func newFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) *filter[N] {
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
return &filter[N]{
filter: fn,
aggregator: agg,
seen: map[attribute.Set]attribute.Set{},
seen: make(map[attribute.Set]attribute.Set),
}
}

Expand All @@ -62,3 +76,44 @@ func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) {
func (f *filter[N]) Aggregation() metricdata.Aggregation {
return f.aggregator.Aggregation()
}

// precomputedFilter is an aggregator that applies attribute filter when
// Aggregating for precomputed Aggregations. The precomputed Aggregations need
dashpole marked this conversation as resolved.
Show resolved Hide resolved
// to operate normally when no attribute filtering is done (for sums this means
// setting the value), but when attribute filtering is done it needs to be
// added to any set value.
type precomputedFilter[N int64 | float64] struct {
filter attribute.Filter
aggregator filterAgg[N]

sync.Mutex
seen map[attribute.Set]attribute.Set
}

func newPrecomputedFilter[N int64 | float64](agg filterAgg[N], fn attribute.Filter) *precomputedFilter[N] {
return &precomputedFilter[N]{
filter: fn,
aggregator: agg,
seen: make(map[attribute.Set]attribute.Set),
}
}
func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) {
// TODO (#3006): drop stale attributes from seen.
f.Lock()
defer f.Unlock()
fAttr, ok := f.seen[attr]
if !ok {
fAttr, _ = attr.Filter(f.filter)
f.seen[attr] = fAttr
}
if fAttr.Equals(&attr) {
// No filtering done.
f.aggregator.Aggregate(measurement, fAttr)
} else {
f.aggregator.filtered(measurement, fAttr)
}
}

func (f *precomputedFilter[N]) Aggregation() metricdata.Aggregation {
return f.aggregator.Aggregation()
}
89 changes: 89 additions & 0 deletions sdk/metric/internal/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"fmt"
"strings"
"sync"
"testing"

Expand Down Expand Up @@ -194,3 +196,90 @@ func TestFilterConcurrent(t *testing.T) {
testFilterConcurrent[float64](t)
})
}

func TestPrecomputedFilter(t *testing.T) {
t.Run("Int64", testPrecomputedFilter[int64]())
t.Run("Float64", testPrecomputedFilter[float64]())
}

func testPrecomputedFilter[N int64 | float64]() func(t *testing.T) {
return func(t *testing.T) {
agg := newTestFilterAgg[N]()
f := NewFilter[N](agg, testAttributeFilter)
require.IsType(t, &precomputedFilter[N]{}, f)

var (
powerLevel = attribute.Int("power-level", 9000)
user = attribute.String("user", "Alice")
admin = attribute.Bool("admin", true)
)
a := attribute.NewSet(powerLevel)
key := a
f.Aggregate(1, a)
assert.Equal(t, N(1), agg.values[key].measured, str(a))
assert.Equal(t, N(0), agg.values[key].filtered, str(a))

a = attribute.NewSet(powerLevel, user)
f.Aggregate(2, a)
assert.Equal(t, N(1), agg.values[key].measured, str(a))
assert.Equal(t, N(2), agg.values[key].filtered, str(a))

a = attribute.NewSet(powerLevel, user, admin)
f.Aggregate(3, a)
assert.Equal(t, N(1), agg.values[key].measured, str(a))
assert.Equal(t, N(5), agg.values[key].filtered, str(a))

a = attribute.NewSet(powerLevel)
f.Aggregate(2, a)
assert.Equal(t, N(2), agg.values[key].measured, str(a))
assert.Equal(t, N(5), agg.values[key].filtered, str(a))

a = attribute.NewSet(user)
f.Aggregate(3, a)
assert.Equal(t, N(2), agg.values[key].measured, str(a))
assert.Equal(t, N(5), agg.values[key].filtered, str(a))
assert.Equal(t, N(3), agg.values[*attribute.EmptySet()].filtered, str(a))

_ = f.Aggregation()
assert.Equal(t, 1, agg.aggregationN, "failed to propagate Aggregation")
}
}

func str(a attribute.Set) string {
iter := a.Iter()
out := make([]string, 0, iter.Len())
for iter.Next() {
kv := iter.Attribute()
out = append(out, fmt.Sprintf("%s:%#v", kv.Key, kv.Value.AsInterface()))
}
return strings.Join(out, ",")
}

type testFilterAgg[N int64 | float64] struct {
values map[attribute.Set]precomputedValue[N]
aggregationN int
}

func newTestFilterAgg[N int64 | float64]() *testFilterAgg[N] {
return &testFilterAgg[N]{
values: make(map[attribute.Set]precomputedValue[N]),
}
}

func (a *testFilterAgg[N]) Aggregate(val N, attr attribute.Set) {
v := a.values[attr]
v.measured = val
a.values[attr] = v
}

// nolint: unused // Used to agg filtered.
func (a *testFilterAgg[N]) filtered(val N, attr attribute.Set) {
v := a.values[attr]
v.filtered += val
a.values[attr] = v
}

func (a *testFilterAgg[N]) Aggregation() metricdata.Aggregation {
a.aggregationN++
return nil
}
128 changes: 94 additions & 34 deletions sdk/metric/internal/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// valueMap is the storage for all sums.
// valueMap is the storage for sums.
type valueMap[N int64 | float64] struct {
sync.Mutex
values map[attribute.Set]N
Expand All @@ -32,12 +32,6 @@ func newValueMap[N int64 | float64]() *valueMap[N] {
return &valueMap[N]{values: make(map[attribute.Set]N)}
}

func (s *valueMap[N]) set(value N, attr attribute.Set) { // nolint: unused // This is indeed used.
s.Lock()
s.values[attr] = value
s.Unlock()
}

func (s *valueMap[N]) Aggregate(value N, attr attribute.Set) {
s.Lock()
s.values[attr] += value
Expand Down Expand Up @@ -164,6 +158,43 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation {
return out
}

type precomputedValue[N int64 | float64] struct {
// measured is the value directly measured.
measured N
// filtered is the sum of values from spatially aggregations.
dashpole marked this conversation as resolved.
Show resolved Hide resolved
filtered N
}

// valueMap is the storage for precomputed sums.
type precomputedMap[N int64 | float64] struct {
sync.Mutex
values map[attribute.Set]precomputedValue[N]
}

func newPrecomputedMap[N int64 | float64]() *precomputedMap[N] {
return &precomputedMap[N]{
values: make(map[attribute.Set]precomputedValue[N]),
}
}

// Aggregate records value as a cumulative sum for attr.
func (s *precomputedMap[N]) Aggregate(value N, attr attribute.Set) {
s.Lock()
v := s.values[attr]
v.measured = value
s.values[attr] = v
s.Unlock()
}

// filtered records value with spatially re-aggregated attrs.
func (s *precomputedMap[N]) filtered(value N, attr attribute.Set) { // nolint: unused // Used to agg filtered.
s.Lock()
v := s.values[attr]
v.filtered += value
s.values[attr] = v
s.Unlock()
}

// NewPrecomputedDeltaSum returns an Aggregator that summarizes a set of
// measurements as their pre-computed arithmetic sum. Each sum is scoped by
// attributes and the aggregation cycle the measurements were made in.
Expand All @@ -176,56 +207,52 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation {
// is up to the caller to ensure this is accurate.
func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] {
return &precomputedDeltaSum[N]{
recorded: make(map[attribute.Set]N),
reported: make(map[attribute.Set]N),
monotonic: monotonic,
start: now(),
precomputedMap: newPrecomputedMap[N](),
reported: make(map[attribute.Set]N),
monotonic: monotonic,
start: now(),
}
}

// precomputedDeltaSum summarizes a set of measurements recorded over all
// aggregation cycles as the delta arithmetic sum.
type precomputedDeltaSum[N int64 | float64] struct {
sync.Mutex
recorded map[attribute.Set]N
*precomputedMap[N]

reported map[attribute.Set]N

monotonic bool
start time.Time
}

// Aggregate records value as a cumulative sum for attr.
func (s *precomputedDeltaSum[N]) Aggregate(value N, attr attribute.Set) {
s.Lock()
s.recorded[attr] = value
s.Unlock()
}

func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
s.Lock()
defer s.Unlock()

if len(s.recorded) == 0 {
if len(s.values) == 0 {
return nil
}

t := now()
out := metricdata.Sum[N]{
Temporality: metricdata.DeltaTemporality,
IsMonotonic: s.monotonic,
DataPoints: make([]metricdata.DataPoint[N], 0, len(s.recorded)),
DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)),
}
for attr, recorded := range s.recorded {
value := recorded - s.reported[attr]
for attr, value := range s.values {
v := value.measured + value.filtered
delta := v - s.reported[attr]
out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{
Attributes: attr,
StartTime: s.start,
Time: t,
Value: value,
Value: delta,
})
if value != 0 {
s.reported[attr] = recorded
if delta != 0 {
s.reported[attr] = v
}
value.filtered = N(0)
s.values[attr] = value
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
Expand All @@ -247,16 +274,49 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
// The output Aggregation will report recorded values as cumulative
// temporality. It is up to the caller to ensure this is accurate.
func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] {
return &precomputedSum[N]{newCumulativeSum[N](monotonic)}
return &precomputedCumulativeSum[N]{
precomputedMap: newPrecomputedMap[N](),
monotonic: monotonic,
start: now(),
}
}

// precomputedSum summarizes a set of measurements recorded over all
// precomputedCumulativeSum summarizes a set of measurements recorded over all
// aggregation cycles directly as the cumulative arithmetic sum.
type precomputedSum[N int64 | float64] struct {
*cumulativeSum[N]
type precomputedCumulativeSum[N int64 | float64] struct {
*precomputedMap[N]

monotonic bool
start time.Time
}

// Aggregate records value as a cumulative sum for attr.
func (s *precomputedSum[N]) Aggregate(value N, attr attribute.Set) {
s.set(value, attr)
func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation {
s.Lock()
defer s.Unlock()

if len(s.values) == 0 {
return nil
}

t := now()
out := metricdata.Sum[N]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: s.monotonic,
DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)),
}
for attr, value := range s.values {
out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{
Attributes: attr,
StartTime: s.start,
Time: t,
Value: value.measured + value.filtered,
})
value.filtered = N(0)
s.values[attr] = value
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
}
return out
}
Loading