Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-v0.112.0 #3

Merged
merged 5 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions connector/countconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ func (c *count) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
for k := 0; k < scopeSpan.Spans().Len(); k++ {
span := scopeSpan.Spans().At(k)
sCtx := ottlspan.NewTransformContext(span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan)
multiError = errors.Join(multiError, spansCounter.update(ctx, span.Attributes(), sCtx))
multiError = errors.Join(multiError, spansCounter.update(ctx, span.Attributes(), scopeSpan.Scope().Attributes(), sCtx))

for l := 0; l < span.Events().Len(); l++ {
event := span.Events().At(l)
eCtx := ottlspanevent.NewTransformContext(event, span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan)
multiError = errors.Join(multiError, spanEventsCounter.update(ctx, event.Attributes(), eCtx))
multiError = errors.Join(multiError, spanEventsCounter.update(ctx, event.Attributes(), scopeSpan.Scope().Attributes(), eCtx))
}
}
}
Expand Down Expand Up @@ -101,39 +101,39 @@ func (c *count) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
for k := 0; k < scopeMetrics.Metrics().Len(); k++ {
metric := scopeMetrics.Metrics().At(k)
mCtx := ottlmetric.NewTransformContext(metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
multiError = errors.Join(multiError, metricsCounter.update(ctx, pcommon.NewMap(), mCtx))
multiError = errors.Join(multiError, metricsCounter.update(ctx, pcommon.NewMap(), pcommon.NewMap(), mCtx))

//exhaustive:enforce
switch metric.Type() {
case pmetric.MetricTypeGauge:
dps := metric.Gauge().DataPoints()
for i := 0; i < dps.Len(); i++ {
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dps.At(i).Attributes(), dCtx))
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dps.At(i).Attributes(), resourceMetric.Resource().Attributes(), dCtx))
}
case pmetric.MetricTypeSum:
dps := metric.Sum().DataPoints()
for i := 0; i < dps.Len(); i++ {
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dps.At(i).Attributes(), dCtx))
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dps.At(i).Attributes(), resourceMetric.Resource().Attributes(), dCtx))
}
case pmetric.MetricTypeSummary:
dps := metric.Summary().DataPoints()
for i := 0; i < dps.Len(); i++ {
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dps.At(i).Attributes(), dCtx))
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dps.At(i).Attributes(), resourceMetric.Resource().Attributes(), dCtx))
}
case pmetric.MetricTypeHistogram:
dps := metric.Histogram().DataPoints()
for i := 0; i < dps.Len(); i++ {
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dps.At(i).Attributes(), dCtx))
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dps.At(i).Attributes(), resourceMetric.Resource().Attributes(), dCtx))
}
case pmetric.MetricTypeExponentialHistogram:
dps := metric.ExponentialHistogram().DataPoints()
for i := 0; i < dps.Len(); i++ {
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dps.At(i).Attributes(), dCtx))
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dps.At(i).Attributes(), resourceMetric.Resource().Attributes(), dCtx))
}
case pmetric.MetricTypeEmpty:
multiError = errors.Join(multiError, fmt.Errorf("metric %q: invalid metric type: %v", metric.Name(), metric.Type()))
Expand Down Expand Up @@ -176,7 +176,7 @@ func (c *count) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
logRecord := scopeLogs.LogRecords().At(k)

lCtx := ottllog.NewTransformContext(logRecord, scopeLogs.Scope(), resourceLog.Resource(), scopeLogs, resourceLog)
multiError = errors.Join(multiError, counter.update(ctx, logRecord.Attributes(), lCtx))
multiError = errors.Join(multiError, counter.update(ctx, logRecord.Attributes(), resourceLog.Resource().Attributes(), lCtx))
}
}

Expand Down
32 changes: 27 additions & 5 deletions connector/countconnector/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type attrCounter struct {
count uint64
}

func (c *counter[K]) update(ctx context.Context, attrs pcommon.Map, tCtx K) error {
func (c *counter[K]) update(ctx context.Context, attrs pcommon.Map, resourceAttrs pcommon.Map, tCtx K) error {
var multiError error
for name, md := range c.metricDefs {
countAttrs := pcommon.NewMap()
Expand Down Expand Up @@ -66,10 +66,32 @@ func (c *counter[K]) update(ctx context.Context, attrs pcommon.Map, tCtx K) erro
}
}
}

// Missing necessary attributes to be counted
if countAttrs.Len() != len(md.attrs) {
continue
for _, resAttr := range md.resourceAttrs {
if resAttrVal, ok := resourceAttrs.Get(resAttr.Key); ok {
switch typeAttr := resAttrVal.Type(); typeAttr {
case pcommon.ValueTypeInt:
countAttrs.PutInt(resAttr.Key, resAttrVal.Int())
case pcommon.ValueTypeDouble:
countAttrs.PutDouble(resAttr.Key, resAttrVal.Double())
default:
countAttrs.PutStr(resAttr.Key, resAttrVal.Str())
}
} else if resAttr.DefaultValue != "" {
switch v := resAttr.DefaultValue.(type) {
case string:
if v != "" {
countAttrs.PutStr(resAttr.Key, v)
}
case int:
if v != 0 {
countAttrs.PutInt(resAttr.Key, int64(v))
}
case float64:
if v != 0 {
countAttrs.PutDouble(resAttr.Key, float64(v))
}
}
}
}

// No conditions, so match all.
Expand Down
7 changes: 4 additions & 3 deletions connector/countconnector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ func createLogsToMetrics(
}

type metricDef[K any] struct {
condition expr.BoolExpr[K]
desc string
attrs []AttributeConfig
condition expr.BoolExpr[K]
desc string
attrs []AttributeConfig
resourceAttrs []AttributeConfig
}
24 changes: 14 additions & 10 deletions connector/countconnector/testdata/logs/condition_and_attribute.yaml
Original file line number Diff line number Diff line change
@@ -1,61 +1,65 @@
resourceMetrics:
- resource:
attributes:
- key: resource.required
value:
stringValue: foo
- key: resource.optional
value:
stringValue: bar
- key: resource.required
value:
stringValue: foo
scopeMetrics:
- metrics:
- description: Log count by attribute if ...
name: log.count.if.by_attr
sum:
aggregationTemporality: 1
dataPoints:
- asInt: "1"
timeUnixNano: "1000000"
- asInt: "2"
attributes:
- key: log.required
value:
stringValue: foo
timeUnixNano: "1678390948399018000"
timeUnixNano: "1000000"
- asInt: "1"
attributes:
- key: log.required
value:
stringValue: notfoo
timeUnixNano: "1678390948399018000"
timeUnixNano: "1000000"
isMonotonic: true
scope:
name: github.com/open-telemetry/opentelemetry-collector-contrib/connector/countconnector
- resource:
attributes:
- key: resource.required
value:
stringValue: foo
- key: resource.optional
value:
stringValue: notbar
- key: resource.required
value:
stringValue: foo
scopeMetrics:
- metrics:
- description: Log count by attribute if ...
name: log.count.if.by_attr
sum:
aggregationTemporality: 1
dataPoints:
- asInt: "1"
timeUnixNano: "1000000"
- asInt: "2"
attributes:
- key: log.required
value:
stringValue: foo
timeUnixNano: "1678390948399021000"
timeUnixNano: "1000000"
- asInt: "1"
attributes:
- key: log.required
value:
stringValue: notfoo
timeUnixNano: "1678390948399021000"
timeUnixNano: "1000000"
isMonotonic: true
scope:
name: github.com/open-telemetry/opentelemetry-collector-contrib/connector/countconnector
Loading
Loading