-
Notifications
You must be signed in to change notification settings - Fork 2.5k
/
Copy pathcounter.go
108 lines (91 loc) · 2.87 KB
/
counter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package countconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/countconnector"
import (
"context"
"errors"
"time"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
)
var noAttributes = [16]byte{}
func newCounter[K any](metricDefs map[string]metricDef[K]) *counter[K] {
return &counter[K]{
metricDefs: metricDefs,
counts: make(map[string]map[[16]byte]*attrCounter, len(metricDefs)),
timestamp: time.Now(),
}
}
type counter[K any] struct {
metricDefs map[string]metricDef[K]
counts map[string]map[[16]byte]*attrCounter
timestamp time.Time
}
type attrCounter struct {
attrs pcommon.Map
count uint64
}
func (c *counter[K]) update(ctx context.Context, attrs pcommon.Map, tCtx K) error {
var multiError error
for name, md := range c.metricDefs {
countAttrs := pcommon.NewMap()
for _, attr := range md.attrs {
if attrVal, ok := attrs.Get(attr.Key); ok {
countAttrs.PutStr(attr.Key, attrVal.Str())
} else if attr.DefaultValue != "" {
countAttrs.PutStr(attr.Key, attr.DefaultValue)
}
}
// Missing necessary attributes to be counted
if countAttrs.Len() != len(md.attrs) {
continue
}
// No conditions, so match all.
if md.condition == nil {
multiError = errors.Join(multiError, c.increment(name, countAttrs))
continue
}
if match, err := md.condition.Eval(ctx, tCtx); err != nil {
multiError = errors.Join(multiError, err)
} else if match {
multiError = errors.Join(multiError, c.increment(name, countAttrs))
}
}
return multiError
}
func (c *counter[K]) increment(metricName string, attrs pcommon.Map) error {
if _, ok := c.counts[metricName]; !ok {
c.counts[metricName] = make(map[[16]byte]*attrCounter)
}
key := noAttributes
if attrs.Len() > 0 {
key = pdatautil.MapHash(attrs)
}
if _, ok := c.counts[metricName][key]; !ok {
c.counts[metricName][key] = &attrCounter{attrs: attrs}
}
c.counts[metricName][key].count++
return nil
}
func (c *counter[K]) appendMetricsTo(metricSlice pmetric.MetricSlice) {
for name, md := range c.metricDefs {
if len(c.counts[name]) == 0 {
continue
}
countMetric := metricSlice.AppendEmpty()
countMetric.SetName(name)
countMetric.SetDescription(md.desc)
sum := countMetric.SetEmptySum()
// The delta value is always positive, so a value accumulated downstream is monotonic
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
for _, dpCount := range c.counts[name] {
dp := sum.DataPoints().AppendEmpty()
dpCount.attrs.CopyTo(dp.Attributes())
dp.SetIntValue(int64(dpCount.count))
// TODO determine appropriate start time
dp.SetTimestamp(pcommon.NewTimestampFromTime(c.timestamp))
}
}
}