diff --git a/processor/deltatocumulativeprocessor/README.md b/processor/deltatocumulativeprocessor/README.md index ffceda1c3d3b..425a8dd97776 100644 --- a/processor/deltatocumulativeprocessor/README.md +++ b/processor/deltatocumulativeprocessor/README.md @@ -36,15 +36,5 @@ There is no further configuration required. All delta samples are converted to c ## Troubleshooting -The following metrics are recorded when [telemetry is -enabled](https://opentelemetry.io/docs/collector/configuration/#telemetry): - -| Name | Description | Unit | -|------------------------------------------|---------------------------------------------------------------------------------------|---------------| -| `deltatocumulative.streams.tracked` | Number of streams currently tracked by the aggregation state | `{stream}` | -| `deltatocumulative.streams.limit` | Upper limit of tracked streams | `{stream}` | -| `deltatocumulative.streams.evicted` | Number of streams removed from tracking to ingest newer streams | `{stream}` | -| `deltatocumulative.streams.max_stale` | Duration without new samples after which streams are dropped | `second` | -| `deltatocumulative.datapoints.processed` | Total number of datapoints processed, whether successful or not | `{datapoint}` | -| `deltatocumulative.datapoints.dropped` | Faulty datapoints that were dropped due to the reason given in the `reason` attribute | `{datapoint}` | -| `deltatocumulative.gaps.length` | Total length of all gaps in the streams, which occur e.g. due to lost in transit | `second` | +When [Telemetry is +enabled](https://opentelemetry.io/docs/collector/configuration/#telemetry), this component exports [several metrics](./documentation.md). diff --git a/processor/deltatocumulativeprocessor/documentation.md b/processor/deltatocumulativeprocessor/documentation.md new file mode 100644 index 000000000000..3498635fb982 --- /dev/null +++ b/processor/deltatocumulativeprocessor/documentation.md @@ -0,0 +1,63 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# deltatocumulative + +## Internal Telemetry + +The following telemetry is emitted by this component. + +### deltatocumulative.datapoints.dropped + +number of datapoints dropped due to given 'reason' + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {datapoint} | Sum | Int | true | + +### deltatocumulative.datapoints.processed + +number of datapoints processed + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {datapoint} | Sum | Int | true | + +### deltatocumulative.gaps.length + +total duration where data was expected but not received + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| s | Sum | Int | true | + +### deltatocumulative.streams.evicted + +number of streams evicted + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {stream} | Sum | Int | true | + +### deltatocumulative.streams.limit + +upper limit of tracked streams + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| {stream} | Gauge | Int | + +### deltatocumulative.streams.max_stale + +duration after which streams inactive streams are dropped + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| s | Gauge | Int | + +### deltatocumulative.streams.tracked + +number of streams tracked + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {dps} | Sum | Int | false | diff --git a/processor/deltatocumulativeprocessor/factory.go b/processor/deltatocumulativeprocessor/factory.go index 06ef84acd8f5..8a6a394083d6 100644 --- a/processor/deltatocumulativeprocessor/factory.go +++ b/processor/deltatocumulativeprocessor/factory.go @@ -28,6 +28,10 @@ func createMetricsProcessor(_ context.Context, set processor.Settings, cfg compo return nil, fmt.Errorf("configuration parsing error") } - meter := metadata.Meter(set.TelemetrySettings) - return newProcessor(pcfg, set.Logger, meter, next), nil + telb, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) + if err != nil { + return nil, err + } + + return newProcessor(pcfg, set.Logger, telb, next), nil } diff --git a/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go b/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go new file mode 100644 index 000000000000..52ad9e905c16 --- /dev/null +++ b/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go @@ -0,0 +1,76 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package deltatocumulativeprocessor + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processortest" +) + +type componentTestTelemetry struct { + reader *sdkmetric.ManualReader + meterProvider *sdkmetric.MeterProvider +} + +func (tt *componentTestTelemetry) NewSettings() processor.Settings { + settings := processortest.NewNopSettings() + settings.MeterProvider = tt.meterProvider + settings.ID = component.NewID(component.MustNewType("deltatocumulative")) + + return settings +} + +func setupTestTelemetry() componentTestTelemetry { + reader := sdkmetric.NewManualReader() + return componentTestTelemetry{ + reader: reader, + meterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)), + } +} + +func (tt *componentTestTelemetry) assertMetrics(t *testing.T, expected []metricdata.Metrics) { + var md metricdata.ResourceMetrics + require.NoError(t, tt.reader.Collect(context.Background(), &md)) + // ensure all required metrics are present + for _, want := range expected { + got := tt.getMetric(want.Name, md) + metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) + } + + // ensure no additional metrics are emitted + require.Equal(t, len(expected), tt.len(md)) +} + +func (tt *componentTestTelemetry) getMetric(name string, got metricdata.ResourceMetrics) metricdata.Metrics { + for _, sm := range got.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return m + } + } + } + + return metricdata.Metrics{} +} + +func (tt *componentTestTelemetry) len(got metricdata.ResourceMetrics) int { + metricsCount := 0 + for _, sm := range got.ScopeMetrics { + metricsCount += len(sm.Metrics) + } + + return metricsCount +} + +func (tt *componentTestTelemetry) Shutdown(ctx context.Context) error { + return tt.meterProvider.Shutdown(ctx) +} diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go index 47e60276e3a3..26a252ffd782 100644 --- a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go +++ b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go @@ -3,9 +3,14 @@ package metadata import ( - "go.opentelemetry.io/collector/component" + "errors" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" ) func Meter(settings component.TelemetrySettings) metric.Meter { @@ -15,3 +20,85 @@ func Meter(settings component.TelemetrySettings) metric.Meter { func Tracer(settings component.TelemetrySettings) trace.Tracer { return settings.TracerProvider.Tracer("otelcol/deltatocumulative") } + +// TelemetryBuilder provides an interface for components to report telemetry +// as defined in metadata and user config. +type TelemetryBuilder struct { + meter metric.Meter + DeltatocumulativeDatapointsDropped metric.Int64Counter + DeltatocumulativeDatapointsProcessed metric.Int64Counter + DeltatocumulativeGapsLength metric.Int64Counter + DeltatocumulativeStreamsEvicted metric.Int64Counter + DeltatocumulativeStreamsLimit metric.Int64Gauge + DeltatocumulativeStreamsMaxStale metric.Int64Gauge + DeltatocumulativeStreamsTracked metric.Int64UpDownCounter + level configtelemetry.Level +} + +// telemetryBuilderOption applies changes to default builder. +type telemetryBuilderOption func(*TelemetryBuilder) + +// WithLevel sets the current telemetry level for the component. +func WithLevel(lvl configtelemetry.Level) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.level = lvl + } +} + +// NewTelemetryBuilder provides a struct with methods to update all internal telemetry +// for a component +func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { + builder := TelemetryBuilder{level: configtelemetry.LevelBasic} + for _, op := range options { + op(&builder) + } + var err, errs error + if builder.level >= configtelemetry.LevelBasic { + builder.meter = Meter(settings) + } else { + builder.meter = noop.Meter{} + } + builder.DeltatocumulativeDatapointsDropped, err = builder.meter.Int64Counter( + "deltatocumulative.datapoints.dropped", + metric.WithDescription("number of datapoints dropped due to given 'reason'"), + metric.WithUnit("{datapoint}"), + ) + errs = errors.Join(errs, err) + builder.DeltatocumulativeDatapointsProcessed, err = builder.meter.Int64Counter( + "deltatocumulative.datapoints.processed", + metric.WithDescription("number of datapoints processed"), + metric.WithUnit("{datapoint}"), + ) + errs = errors.Join(errs, err) + builder.DeltatocumulativeGapsLength, err = builder.meter.Int64Counter( + "deltatocumulative.gaps.length", + metric.WithDescription("total duration where data was expected but not received"), + metric.WithUnit("s"), + ) + errs = errors.Join(errs, err) + builder.DeltatocumulativeStreamsEvicted, err = builder.meter.Int64Counter( + "deltatocumulative.streams.evicted", + metric.WithDescription("number of streams evicted"), + metric.WithUnit("{stream}"), + ) + errs = errors.Join(errs, err) + builder.DeltatocumulativeStreamsLimit, err = builder.meter.Int64Gauge( + "deltatocumulative.streams.limit", + metric.WithDescription("upper limit of tracked streams"), + metric.WithUnit("{stream}"), + ) + errs = errors.Join(errs, err) + builder.DeltatocumulativeStreamsMaxStale, err = builder.meter.Int64Gauge( + "deltatocumulative.streams.max_stale", + metric.WithDescription("duration after which streams inactive streams are dropped"), + metric.WithUnit("s"), + ) + errs = errors.Join(errs, err) + builder.DeltatocumulativeStreamsTracked, err = builder.meter.Int64UpDownCounter( + "deltatocumulative.streams.tracked", + metric.WithDescription("number of streams tracked"), + metric.WithUnit("{dps}"), + ) + errs = errors.Join(errs, err) + return &builder, errs +} diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go index 492fa62a3832..a4ff6092b731 100644 --- a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go +++ b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go @@ -61,3 +61,16 @@ func TestProviders(t *testing.T) { require.Fail(t, "returned Meter not mockTracer") } } + +func TestNewTelemetryBuilder(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + applied := false + _, err := NewTelemetryBuilder(set, func(b *TelemetryBuilder) { + applied = true + }) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go b/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go index 5d9205bffd05..b16d0e4183ef 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go @@ -7,12 +7,14 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/otel/metric/noop" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random" @@ -112,10 +114,13 @@ func TestFaults(t *testing.T) { }, } + telb, err := metadata.NewTelemetryBuilder(component.TelemetrySettings{MeterProvider: noop.NewMeterProvider()}) + require.NoError(t, err) + for _, c := range cases { t.Run(c.Name, func(t *testing.T) { id, dp := sum.Stream() - tel := telemetry.New(noop.Meter{}) + tel := telemetry.New(telb) dps := c.Map if dps == nil { diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go index 4a23dd6fde3e..df9e8643e652 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go @@ -19,22 +19,29 @@ import ( type Telemetry struct { Metrics - - meter metric.Meter } -func New(meter metric.Meter) Telemetry { - return Telemetry{ - Metrics: metrics(meter), - meter: meter, - } +func New(telb *metadata.TelemetryBuilder) Telemetry { + return Telemetry{Metrics: Metrics{ + streams: Streams{ + tracked: telb.DeltatocumulativeStreamsTracked, + limit: telb.DeltatocumulativeStreamsLimit, + evicted: telb.DeltatocumulativeStreamsEvicted, + stale: telb.DeltatocumulativeStreamsMaxStale, + }, + dps: Datapoints{ + total: telb.DeltatocumulativeDatapointsProcessed, + dropped: telb.DeltatocumulativeDatapointsDropped, + }, + gaps: telb.DeltatocumulativeGapsLength, + }} } type Streams struct { tracked metric.Int64UpDownCounter - limit metric.Int64ObservableGauge + limit metric.Int64Gauge evicted metric.Int64Counter - stale metric.Int64ObservableGauge + stale metric.Int64Gauge } type Datapoints struct { @@ -49,69 +56,12 @@ type Metrics struct { gaps metric.Int64Counter } -func metrics(meter metric.Meter) Metrics { - var ( - count = use(meter.Int64Counter) - updown = use(meter.Int64UpDownCounter) - gauge = use(meter.Int64ObservableGauge) - ) - - return Metrics{ - streams: Streams{ - tracked: updown("streams.tracked", - metric.WithDescription("number of streams tracked"), - metric.WithUnit("{stream}"), - ), - limit: gauge("streams.limit", - metric.WithDescription("upper limit of tracked streams"), - metric.WithUnit("{stream}"), - ), - evicted: count("streams.evicted", - metric.WithDescription("number of streams evicted"), - metric.WithUnit("{stream}"), - ), - stale: gauge("streams.max_stale", - metric.WithDescription("duration without new samples after which streams are dropped"), - metric.WithUnit("s"), - ), - }, - dps: Datapoints{ - total: count("datapoints.processed", - metric.WithDescription("number of datapoints processed"), - metric.WithUnit("{datapoint}"), - ), - dropped: count("datapoints.dropped", - metric.WithDescription("number of dropped datapoints due to given 'reason'"), - metric.WithUnit("{datapoint}"), - ), - }, - gaps: count("gaps.length", - metric.WithDescription("total duration where data was expected but not received"), - metric.WithUnit("s"), - ), - } -} - func (tel Telemetry) WithLimit(max int64) { - then := metric.Callback(func(_ context.Context, o metric.Observer) error { - o.ObserveInt64(tel.streams.limit, max) - return nil - }) - _, err := tel.meter.RegisterCallback(then, tel.streams.limit) - if err != nil { - panic(err) - } + tel.streams.limit.Record(context.Background(), max) } func (tel Telemetry) WithStale(max time.Duration) { - then := metric.Callback(func(_ context.Context, o metric.Observer) error { - o.ObserveInt64(tel.streams.stale, int64(max.Seconds())) - return nil - }) - _, err := tel.meter.RegisterCallback(then, tel.streams.stale) - if err != nil { - panic(err) - } + tel.streams.stale.Record(context.Background(), int64(max.Seconds())) } func ObserveItems[T any](items streams.Map[T], metrics *Metrics) Items[T] { diff --git a/processor/deltatocumulativeprocessor/metadata.yaml b/processor/deltatocumulativeprocessor/metadata.yaml index 43e5ba4f3877..1966284cb1eb 100644 --- a/processor/deltatocumulativeprocessor/metadata.yaml +++ b/processor/deltatocumulativeprocessor/metadata.yaml @@ -9,3 +9,56 @@ status: warnings: [Statefulness] codeowners: active: [sh0rez, RichieSams, jpkrohling] + + +telemetry: + metrics: + # streams + deltatocumulative.streams.tracked: + description: number of streams tracked + unit: "{dps}" + sum: + value_type: int + monotonic: false + enabled: true + deltatocumulative.streams.limit: + description: upper limit of tracked streams + unit: "{stream}" + gauge: + value_type: int + enabled: true + deltatocumulative.streams.evicted: + description: number of streams evicted + unit: "{stream}" + sum: + value_type: int + monotonic: true + enabled: true + deltatocumulative.streams.max_stale: + description: duration after which streams inactive streams are dropped + unit: "s" + gauge: + value_type: int + enabled: true + # datapoints + deltatocumulative.datapoints.processed: + description: number of datapoints processed + unit: "{datapoint}" + sum: + value_type: int + monotonic: true + enabled: true + deltatocumulative.datapoints.dropped: + description: number of datapoints dropped due to given 'reason' + unit: "{datapoint}" + sum: + value_type: int + monotonic: true + enabled: true + deltatocumulative.gaps.length: + description: total duration where data was expected but not received + unit: "s" + sum: + value_type: int + monotonic: true + enabled: true diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index bd904c70857e..63202186fb59 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -13,13 +13,13 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/processor" - "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" @@ -40,11 +40,10 @@ type Processor struct { mtx sync.Mutex } -func newProcessor(cfg *Config, log *zap.Logger, meter metric.Meter, next consumer.Metrics) *Processor { +func newProcessor(cfg *Config, log *zap.Logger, telb *metadata.TelemetryBuilder, next consumer.Metrics) *Processor { ctx, cancel := context.WithCancel(context.Background()) - tel := telemetry.New(meter) - + tel := telemetry.New(telb) proc := Processor{ log: log, ctx: ctx,