From 555be3eca838cf73b5b59426577a1d0de8db3f33 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 19 Feb 2021 09:22:34 -0800 Subject: [PATCH] Update docs and code comments to not refer to old consumerdata Updates https://github.com/open-telemetry/opentelemetry-collector/issues/2482 Signed-off-by: Bogdan Drutu --- component/componenttest/example_factories.go | 6 +++--- component/receiver.go | 6 ++---- consumer/pdata/trace.go | 2 -- exporter/README.md | 9 ++++----- exporter/exporterhelper/tracehelper_test.go | 2 +- obsreport/obsreport_receiver.go | 4 ++-- processor/README.md | 20 +++++++++---------- processor/cloningfanoutconnector.go | 2 +- processor/fanoutconnector.go | 2 +- .../probabilisticsampler_test.go | 12 +++-------- .../internal/transaction.go | 2 +- 11 files changed, 28 insertions(+), 39 deletions(-) diff --git a/component/componenttest/example_factories.go b/component/componenttest/example_factories.go index 1eda999ddd1..c79f7bed5b0 100644 --- a/component/componenttest/example_factories.go +++ b/component/componenttest/example_factories.go @@ -287,7 +287,7 @@ func (f *ExampleExporterFactory) CustomUnmarshaler() component.CustomUnmarshaler } } -// CreateTraceExporter creates a trace exporter based on this config. +// CreateTracesExporter creates a trace exporter based on this config. func (f *ExampleExporterFactory) CreateTracesExporter( _ context.Context, _ component.ExporterCreateParams, @@ -330,13 +330,13 @@ func (exp *ExampleExporterConsumer) Start(_ context.Context, _ component.Host) e return nil } -// ConsumeTraceData receives consumerdata.TraceData for processing by the TracesConsumer. +// ConsumeTraces receives pdata.Traces for processing by the TracesConsumer. func (exp *ExampleExporterConsumer) ConsumeTraces(_ context.Context, td pdata.Traces) error { exp.Traces = append(exp.Traces, td) return nil } -// ConsumeMetricsData receives consumerdata.MetricsData for processing by the MetricsConsumer. +// ConsumeMetrics receives pdata.Metrics for processing by the MetricsConsumer. func (exp *ExampleExporterConsumer) ConsumeMetrics(_ context.Context, md pdata.Metrics) error { exp.Metrics = append(exp.Metrics, md) return nil diff --git a/component/receiver.go b/component/receiver.go index a0cd40dc889..44b57070b58 100644 --- a/component/receiver.go +++ b/component/receiver.go @@ -32,8 +32,7 @@ type Receiver interface { // Its purpose is to translate data from the wild into internal trace format. // TracesReceiver feeds a consumer.TracesConsumer with data. // -// For example it could be Zipkin data source which translates -// Zipkin spans into consumerdata.TraceData. +// For example it could be Zipkin data source which translates Zipkin spans into pdata.Traces. type TracesReceiver interface { Receiver } @@ -42,8 +41,7 @@ type TracesReceiver interface { // Its purpose is to translate data from the wild into internal metrics format. // MetricsReceiver feeds a consumer.MetricsConsumer with data. // -// For example it could be Prometheus data source which translates -// Prometheus metrics into consumerdata.MetricsData. +// For example it could be Prometheus data source which translates Prometheus metrics into pdata.Metrics. type MetricsReceiver interface { Receiver } diff --git a/consumer/pdata/trace.go b/consumer/pdata/trace.go index 4da548f987d..d750fe94e01 100644 --- a/consumer/pdata/trace.go +++ b/consumer/pdata/trace.go @@ -22,8 +22,6 @@ import ( // This file defines in-memory data structures to represent traces (spans). // Traces is the top-level struct that is propagated through the traces pipeline. -// This is the newer version of consumerdata.Traces, but uses more efficient -// in-memory representation. type Traces struct { orig *[]*otlptrace.ResourceSpans } diff --git a/exporter/README.md b/exporter/README.md index a2e09f98dd2..49bc7f24565 100644 --- a/exporter/README.md +++ b/exporter/README.md @@ -87,13 +87,12 @@ service: When multiple exporters are configured to send the same data (e.g. by configuring multiple exporters for the same pipeline) the exporters will have a shared access to the data. -Exporters get access to this shared data when `ConsumeTraceData`/`ConsumeMetricsData` -function is called. Exporters MUST NOT modify the `TraceData`/`MetricsData` argument of +Exporters get access to this shared data when `ConsumeTraces`/`ConsumeMetrics`/`ConsumeLogs` +function is called. Exporters MUST NOT modify the `pdata.Traces`/`pdata.Metrics`/`pdata.Logs` argument of these functions. If the exporter needs to modify the data while performing the exporting the exporter can clone the data and perform the modification on the clone or use a -copy-on-write approach for individual sub-parts of `TraceData`/`MetricsData` argument. -Any approach that does not mutate the original `TraceData`/`MetricsData` argument -(including referenced data, such as `Node`, `Resource`, `Spans`, etc) is allowed. +copy-on-write approach for individual sub-parts of `pdata.Traces`/`pdata.Metrics`/`pdata.Logs`. +Any approach that does not mutate the original `pdata.Traces`/`pdata.Metrics`/`pdata.Logs` is allowed. ## Proxy Support diff --git a/exporter/exporterhelper/tracehelper_test.go b/exporter/exporterhelper/tracehelper_test.go index 057c0e03d30..a007b6b1aa9 100644 --- a/exporter/exporterhelper/tracehelper_test.go +++ b/exporter/exporterhelper/tracehelper_test.go @@ -101,7 +101,7 @@ func TestTraceExporter_Default_ReturnError(t *testing.T) { require.NotNil(t, te) err = te.ConsumeTraces(context.Background(), td) - require.Equalf(t, want, err, "ConsumeTraceData returns: Want %v Got %v", want, err) + require.Equal(t, want, err) } func TestTraceExporter_WithRecordMetrics(t *testing.T) { diff --git a/obsreport/obsreport_receiver.go b/obsreport/obsreport_receiver.go index 33148a03372..81ce50d8f59 100644 --- a/obsreport/obsreport_receiver.go +++ b/obsreport/obsreport_receiver.go @@ -110,7 +110,7 @@ type StartReceiveOption func(*StartReceiveOptions) // // Example: // -// func (r *receiver) ClientConnect(ctx context.Context, rcvChan <-chan consumerdata.TraceData) { +// func (r *receiver) ClientConnect(ctx context.Context, rcvChan <-chan pdata.Traces) { // longLivedCtx := obsreport.ReceiverContext(ctx, r.config.Name(), r.transport, "") // for { // // Since the context outlives the individual receive operations call obsreport using @@ -124,7 +124,7 @@ type StartReceiveOption func(*StartReceiveOptions) // td, ok := <-rcvChan // var err error // if ok { -// err = r.nextConsumer.ConsumeTraceData(ctx, td) +// err = r.nextConsumer.ConsumeTraces(ctx, td) // } // obsreport.EndTraceDataReceiveOp( // ctx, diff --git a/processor/README.md b/processor/README.md index c04bf9569e0..9361cb66125 100644 --- a/processor/README.md +++ b/processor/README.md @@ -47,9 +47,10 @@ processor documentation for more information. ## Data Ownership -The ownership of the `TraceData` and `MetricsData` in a pipeline is passed as the data travels -through the pipeline. The data is created by the receiver and then the ownership is passed -to the first processor when `ConsumeTraceData`/`ConsumeMetricsData` function is called. +The ownership of the `pdata.Traces`, `pdata.Metrics` and `pdata.Logs` data in a pipeline +is passed as the data travels through the pipeline. The data is created by the receiver +and then the ownership is passed to the first processor when `ConsumeTraces`/`ConsumeMetrics` +function is called. Note: the receiver may be attached to multiple pipelines, in which case the same data will be passed to all attached pipelines via a data fan-out connector. @@ -79,8 +80,8 @@ data and the data can be safely modified in the pipeline. The exclusive ownership of data allows processors to freely modify the data while they own it (e.g. see `attributesprocessor`). The duration of ownership of the data -by processor is from the beginning of `ConsumeTraceData`/`ConsumeMetricsData` call -until the processor calls the next processor's `ConsumeTraceData`/`ConsumeMetricsData` +by processor is from the beginning of `ConsumeTraces`/`ConsumeMetrics`/`ConsumeLogs` +call until the processor calls the next processor's `ConsumeTraces`/`ConsumeMetrics`/`ConsumeLogs` function, which passes the ownership to the next processor. After that the processor must no longer read or write the data since it may be concurrently modified by the new owner. @@ -97,17 +98,16 @@ In this mode no cloning is performed at the fan-out connector of receivers that are attached to multiple pipelines. In this case all such pipelines will see the same single shared copy of the data. Processors in pipelines operating in shared ownership mode are prohibited from modifying the original data that they receive -via `ConsumeTraceData`/`ConsumeMetricsData` call. Processors may only read the data but -must not modify the data. +via `ConsumeTraces`/`ConsumeMetrics`/`ConsumeLogs` call. Processors may only read +the data but must not modify the data. If the processor needs to modify the data while performing the processing but does not want to incur the cost of data cloning that Exclusive mode brings then the processor can declare that it does not modify the data and use any different technique that ensures original data is not modified. For example, the processor can implement copy-on-write approach for individual sub-parts of -`TraceData`/`MetricsData` argument. Any approach that does not mutate the -original `TraceData`/`MetricsData` argument (including referenced data, such as -`Node`, `Resource`, `Spans`, etc) is allowed. +`pdata.Traces`/`pdata.Metrics`/`pdata.Logs` argument. Any approach that does not +mutate the original `pdata.Traces`/`pdata.Metrics`/`pdata.Logs` is allowed. If the processor uses such technique it should declare that it does not intend to modify the original data by setting `MutatesConsumedData=false` in its capabilities diff --git a/processor/cloningfanoutconnector.go b/processor/cloningfanoutconnector.go index f418631449b..ab25425eba1 100644 --- a/processor/cloningfanoutconnector.go +++ b/processor/cloningfanoutconnector.go @@ -78,7 +78,7 @@ type tracesCloningFanOutConnector []consumer.TracesConsumer var _ consumer.TracesConsumer = (*tracesCloningFanOutConnector)(nil) -// ConsumeTraceData exports the span data to all trace consumers wrapped by the current one. +// ConsumeTraces exports the span data to all trace consumers wrapped by the current one. func (tfc tracesCloningFanOutConnector) ConsumeTraces(ctx context.Context, td pdata.Traces) error { var errs []error diff --git a/processor/fanoutconnector.go b/processor/fanoutconnector.go index 8a7b7ea3ae1..d8242256d1b 100644 --- a/processor/fanoutconnector.go +++ b/processor/fanoutconnector.go @@ -38,7 +38,7 @@ type metricsFanOutConnector []consumer.MetricsConsumer var _ consumer.MetricsConsumer = (*metricsFanOutConnector)(nil) -// ConsumeMetricsData exports the MetricsData to all consumers wrapped by the current one. +// ConsumeMetrics exports the pdata.Metrics to all consumers wrapped by the current one. func (mfc metricsFanOutConnector) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { var errs []error for _, mc := range mfc { diff --git a/processor/probabilisticsamplerprocessor/probabilisticsampler_test.go b/processor/probabilisticsamplerprocessor/probabilisticsampler_test.go index 483cb18f310..aade1e63d35 100644 --- a/processor/probabilisticsamplerprocessor/probabilisticsampler_test.go +++ b/processor/probabilisticsamplerprocessor/probabilisticsampler_test.go @@ -150,10 +150,7 @@ func Test_tracesamplerprocessor_SamplingPercentageRange(t *testing.T) { return } for _, td := range genRandomTestData(tt.numBatches, tt.numTracesPerBatch, testSvcName, 1) { - if err := tsp.ConsumeTraces(context.Background(), td); err != nil { - t.Errorf("tracesamplerprocessor.ConsumeTraceData() error = %v", err) - return - } + assert.NoError(t, tsp.ConsumeTraces(context.Background(), td)) } _, sampled := assertSampledData(t, sink.AllTraces(), testSvcName) actualPercentageSamplingPercentage := float32(sampled) / float32(tt.numBatches*tt.numTracesPerBatch) * 100.0 @@ -213,10 +210,7 @@ func Test_tracesamplerprocessor_SamplingPercentageRange_MultipleResourceSpans(t } for _, td := range genRandomTestData(tt.numBatches, tt.numTracesPerBatch, testSvcName, tt.resourceSpanPerTrace) { - if err := tsp.ConsumeTraces(context.Background(), td); err != nil { - t.Errorf("tracesamplerprocessor.ConsumeTraceData() error = %v", err) - return - } + assert.NoError(t, tsp.ConsumeTraces(context.Background(), td)) assert.Equal(t, tt.resourceSpanPerTrace*tt.numTracesPerBatch, sink.SpansCount()) sink.Reset() } @@ -440,7 +434,7 @@ func Test_hash(t *testing.T) { } } -// genRandomTestData generates a slice of consumerdata.TraceData with the numBatches elements which one with +// genRandomTestData generates a slice of pdata.Traces with the numBatches elements which one with // numTracesPerBatch spans (ie.: each span has a different trace ID). All spans belong to the specified // serviceName. func genRandomTestData(numBatches, numTracesPerBatch int, serviceName string, resourceSpanCount int) (tdd []pdata.Traces) { diff --git a/receiver/prometheusreceiver/internal/transaction.go b/receiver/prometheusreceiver/internal/transaction.go index bd40e2b0459..df52ac0861e 100644 --- a/receiver/prometheusreceiver/internal/transaction.go +++ b/receiver/prometheusreceiver/internal/transaction.go @@ -170,7 +170,7 @@ func (tr *transaction) Commit() error { adjustStartTime(tr.metricBuilder.startTime, metrics) } else { // AdjustMetrics - jobsMap has to be non-nil in this case. - // Note: metrics could be empty after adjustment, which needs to be checked before passing it on to ConsumeMetricsData() + // Note: metrics could be empty after adjustment, which needs to be checked before passing it on to ConsumeMetrics() metrics, _ = NewMetricsAdjuster(tr.jobsMap.get(tr.job, tr.instance), tr.logger).AdjustMetrics(metrics) }