From 7a85f6fb34137661616ad35127e99010442f80fb Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Thu, 26 Mar 2020 09:15:12 -0700 Subject: [PATCH] Add internal data -> OC compatibility shims to processors and exporters (#667) In order to start migrating components from OC to internal data, all parts of the pipeline should be able to automatically convert back to OC if downstream component doesn't work with internal data structure --- component/exporter.go | 16 ++- component/processor.go | 26 ++-- consumer/consumer.go | 16 +-- exporter/zipkinexporter/zipkin_test.go | 4 +- processor/cloningfanoutconnector.go | 32 +++++ processor/cloningfanoutconnector_test.go | 8 +- processor/fanoutconnector.go | 113 ++++++++++++++-- processor/fanoutconnector_test.go | 151 +++++++++++++++++++--- service/builder/exporters_builder.go | 53 ++++++-- service/builder/pipelines_builder.go | 105 ++++++++++++--- service/builder/pipelines_builder_test.go | 2 +- service/builder/receivers_builder.go | 22 ++-- 12 files changed, 452 insertions(+), 96 deletions(-) diff --git a/component/exporter.go b/component/exporter.go index 453145ba2aa..4570b25b52f 100644 --- a/component/exporter.go +++ b/component/exporter.go @@ -28,28 +28,38 @@ type Exporter interface { Component } +// TraceExporterBase defines a common interface for TraceExporter and TraceExporterOld +type TraceExporterBase interface { + Exporter +} + // TraceExporterOld is a TraceConsumer that is also an Exporter. type TraceExporterOld interface { consumer.TraceConsumerOld - Exporter + TraceExporterBase } // TraceExporter is an TraceConsumer that is also an Exporter. type TraceExporter interface { consumer.TraceConsumer + TraceExporterBase +} + +// MetricsExporterBase defines a common interface for MetricsExporter and MetricsExporterOld +type MetricsExporterBase interface { Exporter } // MetricsExporterOld is a MetricsConsumer that is also an Exporter. type MetricsExporterOld interface { consumer.MetricsConsumerOld - Exporter + MetricsExporterBase } // MetricsExporter is a MetricsConsumer that is also an Exporter. type MetricsExporter interface { consumer.MetricsConsumer - Exporter + MetricsExporterBase } // ExporterFactoryBase defines the common functions for all exporter factories. diff --git a/component/processor.go b/component/processor.go index b7ce8b35ed2..c7bb9733611 100644 --- a/component/processor.go +++ b/component/processor.go @@ -32,28 +32,38 @@ type Processor interface { GetCapabilities() ProcessorCapabilities } -// TraceProcessorOld composes TraceConsumer with some additional processor-specific functions. -type TraceProcessorOld interface { - consumer.TraceConsumerOld +// TraceProcessorBase is a common interface for TraceProcessor and TraceProcessorOld +type TraceProcessorBase interface { Processor } -// MetricsProcessor composes MetricsConsumer with some additional processor-specific functions. -type MetricsProcessorOld interface { - consumer.MetricsConsumerOld - Processor +// TraceProcessorOld composes TraceConsumer with some additional processor-specific functions. +type TraceProcessorOld interface { + consumer.TraceConsumerOld + TraceProcessorBase } // TraceProcessor composes TraceConsumer with some additional processor-specific functions. type TraceProcessor interface { consumer.TraceConsumer + TraceProcessorBase +} + +// MetricsProcessorBase is a common interface for MetricsProcessor and MetricsProcessorV2 +type MetricsProcessorBase interface { Processor } +// MetricsProcessor composes MetricsConsumer with some additional processor-specific functions. +type MetricsProcessorOld interface { + consumer.MetricsConsumerOld + MetricsProcessorBase +} + // MetricsProcessor composes MetricsConsumer with some additional processor-specific functions. type MetricsProcessor interface { consumer.MetricsConsumer - Processor + MetricsProcessorBase } // ProcessorCapabilities describes the capabilities of a Processor. diff --git a/consumer/consumer.go b/consumer/consumer.go index 2accdb79da5..db1835d864b 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -22,41 +22,41 @@ import ( "github.com/open-telemetry/opentelemetry-collector/internal/data" ) -// BaseMetricsConsumer defines a common interface for MetricsConsumerOld and MetricsConsumer. -type BaseMetricsConsumer interface{} +// MetricsConsumerBase defines a common interface for MetricsConsumerOld and MetricsConsumer. +type MetricsConsumerBase interface{} // MetricsConsumerOld is an interface that receives consumerdata.MetricsData, process it as needed, and // sends it to the next processing node if any or to the destination. // // ConsumeMetricsData receives consumerdata.MetricsData for processing by the MetricsConsumer. type MetricsConsumerOld interface { - BaseMetricsConsumer + MetricsConsumerBase ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error } // MetricsConsumer is the new metrics consumer interface that receives data.MetricData, processes it // as needed, and sends it to the next processing node if any or to the destination. type MetricsConsumer interface { - BaseMetricsConsumer + MetricsConsumerBase ConsumeMetrics(ctx context.Context, md data.MetricData) error } -// BaseTraceConsumer defines a common interface for TraceConsumerOld and TraceConsumer. -type BaseTraceConsumer interface{} +// TraceConsumerBase defines a common interface for TraceConsumerOld and TraceConsumer. +type TraceConsumerBase interface{} // TraceConsumerOld is an interface that receives consumerdata.TraceData, process it as needed, and // sends it to the next processing node if any or to the destination. // // ConsumeTraceData receives consumerdata.TraceData for processing by the TraceConsumer. type TraceConsumerOld interface { - BaseTraceConsumer + TraceConsumerBase ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error } // TraceConsumer is an interface that receives data.TraceData, processes it // as needed, and sends it to the next processing node if any or to the destination. type TraceConsumer interface { - BaseTraceConsumer + TraceConsumerBase // ConsumeTrace receives data.TraceData for processing. ConsumeTrace(ctx context.Context, td data.TraceData) error } diff --git a/exporter/zipkinexporter/zipkin_test.go b/exporter/zipkinexporter/zipkin_test.go index db8b27d2efd..9f8c2f77445 100644 --- a/exporter/zipkinexporter/zipkin_test.go +++ b/exporter/zipkinexporter/zipkin_test.go @@ -70,7 +70,7 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) { mzr := newMockZipkinReporter(cst.URL) // Run the Zipkin receiver to "receive spans upload from a client application" - zexp := processor.NewTraceFanOutConnector([]consumer.TraceConsumerOld{tes}) + zexp := processor.NewTraceFanOutConnectorOld([]consumer.TraceConsumerOld{tes}) addr := testutils.GetAvailableLocalAddress(t) zi, err := zipkinreceiver.New("zipkin_receiver", addr, zexp) assert.NoError(t, err) @@ -300,7 +300,7 @@ func TestZipkinExporter_roundtripProto(t *testing.T) { mzr.serializer = zipkinproto.SpanSerializer{} // Run the Zipkin receiver to "receive spans upload from a client application" - zexp := processor.NewTraceFanOutConnector([]consumer.TraceConsumerOld{tes}) + zexp := processor.NewTraceFanOutConnectorOld([]consumer.TraceConsumerOld{tes}) port := testutils.GetAvailablePort(t) zi, err := zipkinreceiver.New( "zipkin_receiver", fmt.Sprintf(":%d", port), zexp) diff --git a/processor/cloningfanoutconnector.go b/processor/cloningfanoutconnector.go index 995ac0388ce..293ed086b59 100644 --- a/processor/cloningfanoutconnector.go +++ b/processor/cloningfanoutconnector.go @@ -33,6 +33,22 @@ import ( // clones of data before fanning out, which ensures each consumer gets their // own copy of data and is free to modify it. +// CreateMetricsCloningFanOutConnector is a placeholder function for now. +// It supposed to create an old type connector or a new type connector based on type of provided metrics consumer. +func CreateMetricsCloningFanOutConnector(baseMetricsConsumers []consumer.MetricsConsumerBase) consumer.MetricsConsumerOld { + // TODO: CreateMetricsCloningFanOutConnector doesn't support new type of consumers + // until internal data structure provides Clone method. + metricsConsumers := make([]consumer.MetricsConsumerOld, 0, len(baseMetricsConsumers)) + for _, baseMetricsConsumer := range baseMetricsConsumers { + metricsConsumer, ok := baseMetricsConsumer.(consumer.MetricsConsumerOld) + if !ok { + panic("CreateMetricsCloningFanOutConnector does not support new type of MetricsConsumer") + } + metricsConsumers = append(metricsConsumers, metricsConsumer) + } + return NewMetricsCloningFanOutConnector(metricsConsumers) +} + // NewMetricsCloningFanOutConnector wraps multiple metrics consumers in a single one. func NewMetricsCloningFanOutConnector(mcs []consumer.MetricsConsumerOld) consumer.MetricsConsumerOld { return metricsCloningFanOutConnector(mcs) @@ -66,6 +82,22 @@ func (mfc metricsCloningFanOutConnector) ConsumeMetricsData(ctx context.Context, return oterr.CombineErrors(errs) } +// CreateTraceCloningFanOutConnector is a placeholder function for now. +// It supposed to create an old type connector or a new type connector based on type of provided trace consumer. +func CreateTraceCloningFanOutConnector(baseTraceConsumers []consumer.TraceConsumerBase) consumer.TraceConsumerOld { + // TODO: CreateTraceCloningFanOutConnector doesn't support new type of consumers + // until internal data structure provides Clone functionality + traceConsumers := make([]consumer.TraceConsumerOld, 0, len(baseTraceConsumers)) + for _, baseTraceConsumer := range baseTraceConsumers { + traceConsumer, ok := baseTraceConsumer.(consumer.TraceConsumerOld) + if !ok { + panic("CreateTraceCloningFanOutConnector does not support new type of TraceConsumer") + } + traceConsumers = append(traceConsumers, traceConsumer) + } + return NewTraceCloningFanOutConnector(traceConsumers) +} + // NewTraceCloningFanOutConnector wraps multiple trace consumers in a single one. func NewTraceCloningFanOutConnector(tcs []consumer.TraceConsumerOld) consumer.TraceConsumerOld { return traceCloningFanOutConnector(tcs) diff --git a/processor/cloningfanoutconnector_test.go b/processor/cloningfanoutconnector_test.go index be0aec34a72..ccb51c4f9a8 100644 --- a/processor/cloningfanoutconnector_test.go +++ b/processor/cloningfanoutconnector_test.go @@ -33,7 +33,7 @@ import ( func TestTraceProcessorCloningMultiplexing(t *testing.T) { processors := make([]consumer.TraceConsumerOld, 3) for i := range processors { - processors[i] = &mockTraceConsumer{} + processors[i] = &mockTraceConsumerOld{} } tfc := NewTraceCloningFanOutConnector(processors) @@ -55,7 +55,7 @@ func TestTraceProcessorCloningMultiplexing(t *testing.T) { } for i, p := range processors { - m := p.(*mockTraceConsumer) + m := p.(*mockTraceConsumerOld) assert.Equal(t, wantSpansCount, m.TotalSpans) if i < len(processors)-1 { assert.True(t, td.Resource != m.Traces[0].Resource) @@ -69,7 +69,7 @@ func TestTraceProcessorCloningMultiplexing(t *testing.T) { func TestMetricsProcessorCloningMultiplexing(t *testing.T) { processors := make([]consumer.MetricsConsumerOld, 3) for i := range processors { - processors[i] = &mockMetricsConsumer{} + processors[i] = &mockMetricsConsumerOld{} } mfc := NewMetricsCloningFanOutConnector(processors) @@ -91,7 +91,7 @@ func TestMetricsProcessorCloningMultiplexing(t *testing.T) { } for i, p := range processors { - m := p.(*mockMetricsConsumer) + m := p.(*mockMetricsConsumerOld) assert.Equal(t, wantMetricsCount, m.TotalMetrics) if i < len(processors)-1 { assert.True(t, md.Resource != m.Metrics[0].Resource) diff --git a/processor/fanoutconnector.go b/processor/fanoutconnector.go index 8383fa70687..85b43406df0 100644 --- a/processor/fanoutconnector.go +++ b/processor/fanoutconnector.go @@ -19,23 +19,48 @@ import ( "github.com/open-telemetry/opentelemetry-collector/consumer" "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" + "github.com/open-telemetry/opentelemetry-collector/internal/data" "github.com/open-telemetry/opentelemetry-collector/oterr" ) // This file contains implementations of Trace/Metrics connectors // that fan out the data to multiple other consumers. -// NewMetricsFanOutConnector wraps multiple metrics consumers in a single one. -func NewMetricsFanOutConnector(mcs []consumer.MetricsConsumerOld) consumer.MetricsConsumerOld { - return metricsFanOutConnector(mcs) +// CreateMetricsFanOutConnector creates a connector based on provided type of trace consumer. +// If any of the wrapped metrics consumers are of the new type, use metricsFanOutConnector, +// otherwise use the old type connector. +func CreateMetricsFanOutConnector(mcs []consumer.MetricsConsumerBase) consumer.MetricsConsumerBase { + metricsConsumersOld := make([]consumer.MetricsConsumerOld, 0, len(mcs)) + metricsConsumers := make([]consumer.MetricsConsumer, 0, len(mcs)) + allMetricsConsumersOld := true + for _, mc := range mcs { + if metricsConsumer, ok := mc.(consumer.MetricsConsumer); ok { + allMetricsConsumersOld = false + metricsConsumers = append(metricsConsumers, metricsConsumer) + } else { + metricsConsumerOld := mc.(consumer.MetricsConsumerOld) + metricsConsumersOld = append(metricsConsumersOld, metricsConsumerOld) + metricsConsumers = append(metricsConsumers, consumer.NewInternalToOCMetricsConverter(metricsConsumerOld)) + } + } + + if allMetricsConsumersOld { + return NewMetricsFanOutConnectorOld(metricsConsumersOld) + } + return NewMetricsFanOutConnector(metricsConsumers) } -type metricsFanOutConnector []consumer.MetricsConsumerOld +// NewMetricsFanOutConnectorOld wraps multiple metrics consumers in a single one. +func NewMetricsFanOutConnectorOld(mcs []consumer.MetricsConsumerOld) consumer.MetricsConsumerOld { + return metricsFanOutConnectorOld(mcs) +} -var _ consumer.MetricsConsumerOld = (*metricsFanOutConnector)(nil) +type metricsFanOutConnectorOld []consumer.MetricsConsumerOld + +var _ consumer.MetricsConsumerOld = (*metricsFanOutConnectorOld)(nil) // ConsumeMetricsData exports the MetricsData to all consumers wrapped by the current one. -func (mfc metricsFanOutConnector) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { +func (mfc metricsFanOutConnectorOld) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { var errs []error for _, mc := range mfc { if err := mc.ConsumeMetricsData(ctx, md); err != nil { @@ -45,17 +70,61 @@ func (mfc metricsFanOutConnector) ConsumeMetricsData(ctx context.Context, md con return oterr.CombineErrors(errs) } -// NewTraceFanOutConnector wraps multiple trace consumers in a single one. -func NewTraceFanOutConnector(tcs []consumer.TraceConsumerOld) consumer.TraceConsumerOld { - return traceFanOutConnector(tcs) +// NewMetricsFanOutConnector wraps multiple new type metrics consumers in a single one. +func NewMetricsFanOutConnector(mcs []consumer.MetricsConsumer) consumer.MetricsConsumer { + return metricsFanOutConnector(mcs) } -type traceFanOutConnector []consumer.TraceConsumerOld +type metricsFanOutConnector []consumer.MetricsConsumer -var _ consumer.TraceConsumerOld = (*traceFanOutConnector)(nil) +var _ consumer.MetricsConsumer = (*metricsFanOutConnector)(nil) + +// ConsumeMetricsData exports the MetricsData to all consumers wrapped by the current one. +func (mfc metricsFanOutConnector) ConsumeMetrics(ctx context.Context, md data.MetricData) error { + var errs []error + for _, mc := range mfc { + if err := mc.ConsumeMetrics(ctx, md); err != nil { + errs = append(errs, err) + } + } + return oterr.CombineErrors(errs) +} + +// CreateTraceFanOutConnector wraps multiple trace consumers in a single one. +// If any of the wrapped trace consumers are of the new type, use traceFanOutConnector, +// otherwise use the old type connector +func CreateTraceFanOutConnector(tcs []consumer.TraceConsumerBase) consumer.TraceConsumerBase { + traceConsumersOld := make([]consumer.TraceConsumerOld, 0, len(tcs)) + traceConsumers := make([]consumer.TraceConsumer, 0, len(tcs)) + allTraceConsumersOld := true + for _, tc := range tcs { + if traceConsumer, ok := tc.(consumer.TraceConsumer); ok { + allTraceConsumersOld = false + traceConsumers = append(traceConsumers, traceConsumer) + } else { + traceConsumerOld := tc.(consumer.TraceConsumerOld) + traceConsumersOld = append(traceConsumersOld, traceConsumerOld) + traceConsumers = append(traceConsumers, consumer.NewInternalToOCTraceConverter(traceConsumerOld)) + } + } + + if allTraceConsumersOld { + return NewTraceFanOutConnectorOld(traceConsumersOld) + } + return NewTraceFanOutConnector(traceConsumers) +} + +// NewTraceFanOutConnectorOld wraps multiple trace consumers in a single one. +func NewTraceFanOutConnectorOld(tcs []consumer.TraceConsumerOld) consumer.TraceConsumerOld { + return traceFanOutConnectorOld(tcs) +} + +type traceFanOutConnectorOld []consumer.TraceConsumerOld + +var _ consumer.TraceConsumerOld = (*traceFanOutConnectorOld)(nil) // ConsumeTraceData exports the span data to all trace consumers wrapped by the current one. -func (tfc traceFanOutConnector) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { +func (tfc traceFanOutConnectorOld) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { var errs []error for _, tc := range tfc { if err := tc.ConsumeTraceData(ctx, td); err != nil { @@ -64,3 +133,23 @@ func (tfc traceFanOutConnector) ConsumeTraceData(ctx context.Context, td consume } return oterr.CombineErrors(errs) } + +// NewTraceFanOutConnector wraps multiple new type trace consumers in a single one. +func NewTraceFanOutConnector(tcs []consumer.TraceConsumer) consumer.TraceConsumer { + return traceFanOutConnector(tcs) +} + +type traceFanOutConnector []consumer.TraceConsumer + +var _ consumer.TraceConsumer = (*traceFanOutConnector)(nil) + +// ConsumeTrace exports the span data to all trace consumers wrapped by the current one. +func (tfc traceFanOutConnector) ConsumeTrace(ctx context.Context, td data.TraceData) error { + var errs []error + for _, tc := range tfc { + if err := tc.ConsumeTrace(ctx, td); err != nil { + errs = append(errs, err) + } + } + return oterr.CombineErrors(errs) +} diff --git a/processor/fanoutconnector_test.go b/processor/fanoutconnector_test.go index 876647b6044..2581003728e 100644 --- a/processor/fanoutconnector_test.go +++ b/processor/fanoutconnector_test.go @@ -26,15 +26,17 @@ import ( "github.com/open-telemetry/opentelemetry-collector/consumer" "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" + "github.com/open-telemetry/opentelemetry-collector/internal/data" + "github.com/open-telemetry/opentelemetry-collector/translator/conventions" ) func TestTraceProcessorMultiplexing(t *testing.T) { processors := make([]consumer.TraceConsumerOld, 3) for i := range processors { - processors[i] = &mockTraceConsumer{} + processors[i] = &mockTraceConsumerOld{} } - tfc := NewTraceFanOutConnector(processors) + tfc := NewTraceFanOutConnectorOld(processors) td := consumerdata.TraceData{ Spans: make([]*tracepb.Span, 7), Resource: &resourcepb.Resource{ @@ -53,7 +55,7 @@ func TestTraceProcessorMultiplexing(t *testing.T) { } for _, p := range processors { - m := p.(*mockTraceConsumer) + m := p.(*mockTraceConsumerOld) assert.Equal(t, wantSpansCount, m.TotalSpans) assert.True(t, td.Resource == m.Traces[0].Resource) } @@ -62,13 +64,13 @@ func TestTraceProcessorMultiplexing(t *testing.T) { func TestTraceProcessorWhenOneErrors(t *testing.T) { processors := make([]consumer.TraceConsumerOld, 3) for i := range processors { - processors[i] = &mockTraceConsumer{} + processors[i] = &mockTraceConsumerOld{} } // Make one processor return error - processors[1].(*mockTraceConsumer).MustFail = true + processors[1].(*mockTraceConsumerOld).MustFail = true - tfc := NewTraceFanOutConnector(processors) + tfc := NewTraceFanOutConnectorOld(processors) td := consumerdata.TraceData{ Spans: make([]*tracepb.Span, 5), } @@ -84,7 +86,7 @@ func TestTraceProcessorWhenOneErrors(t *testing.T) { } for _, p := range processors { - m := p.(*mockTraceConsumer) + m := p.(*mockTraceConsumerOld) if m.TotalSpans != wantSpansCount { t.Errorf("Wanted %d spans for every processor but got %d", wantSpansCount, m.TotalSpans) return @@ -95,10 +97,10 @@ func TestTraceProcessorWhenOneErrors(t *testing.T) { func TestMetricsProcessorMultiplexing(t *testing.T) { processors := make([]consumer.MetricsConsumerOld, 3) for i := range processors { - processors[i] = &mockMetricsConsumer{} + processors[i] = &mockMetricsConsumerOld{} } - mfc := NewMetricsFanOutConnector(processors) + mfc := NewMetricsFanOutConnectorOld(processors) md := consumerdata.MetricsData{ Metrics: make([]*metricspb.Metric, 7), } @@ -114,7 +116,7 @@ func TestMetricsProcessorMultiplexing(t *testing.T) { } for _, p := range processors { - m := p.(*mockMetricsConsumer) + m := p.(*mockMetricsConsumerOld) assert.Equal(t, wantMetricsCount, m.TotalMetrics) assert.True(t, md.Resource == m.Metrics[0].Resource) } @@ -123,13 +125,13 @@ func TestMetricsProcessorMultiplexing(t *testing.T) { func TestMetricsProcessorWhenOneErrors(t *testing.T) { processors := make([]consumer.MetricsConsumerOld, 3) for i := range processors { - processors[i] = &mockMetricsConsumer{} + processors[i] = &mockMetricsConsumerOld{} } // Make one processor return error - processors[1].(*mockMetricsConsumer).MustFail = true + processors[1].(*mockMetricsConsumerOld).MustFail = true - mfc := NewMetricsFanOutConnector(processors) + mfc := NewMetricsFanOutConnectorOld(processors) md := consumerdata.MetricsData{ Metrics: make([]*metricspb.Metric, 5), } @@ -145,7 +147,7 @@ func TestMetricsProcessorWhenOneErrors(t *testing.T) { } for _, p := range processors { - m := p.(*mockMetricsConsumer) + m := p.(*mockMetricsConsumerOld) if m.TotalMetrics != wantMetricsCount { t.Errorf("Wanted %d metrics for every processor but got %d", wantMetricsCount, m.TotalMetrics) return @@ -153,15 +155,89 @@ func TestMetricsProcessorWhenOneErrors(t *testing.T) { } } -type mockTraceConsumer struct { +func TestCreateTraceFanOutConnectorWithConvertion(t *testing.T) { + traceConsumerOld := &mockTraceConsumerOld{} + traceConsumer := &mockTraceConsumer{} + processors := []consumer.TraceConsumerBase{ + traceConsumerOld, + traceConsumer, + } + + resourceTypeName := "good-resource" + + resource := data.NewResource() + resource.SetAttributes(data.NewAttributeMap(data.AttributesMap{ + conventions.OCAttributeResourceType: data.NewAttributeValueString(resourceTypeName), + })) + td := data.NewTraceData() + td.SetResourceSpans(data.NewResourceSpansSlice(1)) + td.ResourceSpans().Get(0).SetResource(resource) + td.ResourceSpans().Get(0).SetInstrumentationLibrarySpans(data.NewInstrumentationLibrarySpansSlice(1)) + td.ResourceSpans().Get(0).InstrumentationLibrarySpans().Get(0).SetSpans(data.NewSpanSlice(3)) + + tfc := CreateTraceFanOutConnector(processors).(consumer.TraceConsumer) + + var wantSpansCount = 0 + for i := 0; i < 2; i++ { + wantSpansCount += td.SpanCount() + err := tfc.ConsumeTrace(context.Background(), td) + assert.NoError(t, err) + } + + assert.Equal(t, wantSpansCount, traceConsumerOld.TotalSpans) + assert.Equal(t, resourceTypeName, traceConsumerOld.Traces[0].Resource.Type) + + assert.Equal(t, wantSpansCount, traceConsumer.TotalSpans) + assert.Equal(t, resource, traceConsumer.Traces[0].ResourceSpans().Get(0).Resource()) +} + +func TestCreateMetricsFanOutConnectorWithConvertion(t *testing.T) { + metricsConsumerOld := &mockMetricsConsumerOld{} + metricsConsumer := &mockMetricsConsumer{} + processors := []consumer.MetricsConsumerBase{ + metricsConsumerOld, + metricsConsumer, + } + + resourceTypeName := "good-resource" + + md := data.NewMetricData() + rms := data.NewResourceMetricsSlice(1) + md.SetResourceMetrics(rms) + rm := rms.Get(0) + resource := data.NewResource() + resource.SetAttributes(data.NewAttributeMap(data.AttributesMap{ + conventions.OCAttributeResourceType: data.NewAttributeValueString(resourceTypeName), + })) + rm.SetResource(resource) + rm.SetInstrumentationLibraryMetrics(data.NewInstrumentationLibraryMetricsSlice(1)) + rm.InstrumentationLibraryMetrics().Get(0).SetMetrics(data.NewMetricSlice(4)) + + mfc := CreateMetricsFanOutConnector(processors).(consumer.MetricsConsumer) + + var wantSpansCount = 0 + for i := 0; i < 2; i++ { + wantSpansCount += md.MetricCount() + err := mfc.ConsumeMetrics(context.Background(), md) + assert.NoError(t, err) + } + + assert.Equal(t, wantSpansCount, metricsConsumerOld.TotalMetrics) + assert.Equal(t, resourceTypeName, metricsConsumerOld.Metrics[0].Resource.Type) + + assert.Equal(t, wantSpansCount, metricsConsumer.TotalMetrics) + assert.Equal(t, resource, metricsConsumer.Metrics[0].ResourceMetrics().Get(0).Resource()) +} + +type mockTraceConsumerOld struct { Traces []*consumerdata.TraceData TotalSpans int MustFail bool } -var _ consumer.TraceConsumerOld = &mockTraceConsumer{} +var _ consumer.TraceConsumerOld = &mockTraceConsumerOld{} -func (p *mockTraceConsumer) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { +func (p *mockTraceConsumerOld) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { p.Traces = append(p.Traces, &td) p.TotalSpans += len(td.Spans) if p.MustFail { @@ -171,15 +247,33 @@ func (p *mockTraceConsumer) ConsumeTraceData(ctx context.Context, td consumerdat return nil } -type mockMetricsConsumer struct { +type mockTraceConsumer struct { + Traces []*data.TraceData + TotalSpans int + MustFail bool +} + +var _ consumer.TraceConsumer = &mockTraceConsumer{} + +func (p *mockTraceConsumer) ConsumeTrace(ctx context.Context, td data.TraceData) error { + p.Traces = append(p.Traces, &td) + p.TotalSpans += td.SpanCount() + if p.MustFail { + return fmt.Errorf("this processor must fail") + } + return nil + +} + +type mockMetricsConsumerOld struct { Metrics []*consumerdata.MetricsData TotalMetrics int MustFail bool } -var _ consumer.MetricsConsumerOld = &mockMetricsConsumer{} +var _ consumer.MetricsConsumerOld = &mockMetricsConsumerOld{} -func (p *mockMetricsConsumer) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { +func (p *mockMetricsConsumerOld) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { p.Metrics = append(p.Metrics, &md) p.TotalMetrics += len(md.Metrics) if p.MustFail { @@ -188,3 +282,20 @@ func (p *mockMetricsConsumer) ConsumeMetricsData(ctx context.Context, md consume return nil } + +type mockMetricsConsumer struct { + Metrics []*data.MetricData + TotalMetrics int + MustFail bool +} + +var _ consumer.MetricsConsumer = &mockMetricsConsumer{} + +func (p *mockMetricsConsumer) ConsumeMetrics(ctx context.Context, md data.MetricData) error { + p.Metrics = append(p.Metrics, &md) + p.TotalMetrics += md.MetricCount() + if p.MustFail { + return fmt.Errorf("this processor must fail") + } + return nil +} diff --git a/service/builder/exporters_builder.go b/service/builder/exporters_builder.go index 1fce88674e1..3b6324a214a 100644 --- a/service/builder/exporters_builder.go +++ b/service/builder/exporters_builder.go @@ -15,6 +15,7 @@ package builder import ( + "context" "fmt" "go.uber.org/zap" @@ -28,8 +29,8 @@ import ( // builtExporter is an exporter that is built based on a config. It can have // a trace and/or a metrics consumer and have a shutdown function. type builtExporter struct { - te component.TraceExporterOld - me component.MetricsExporterOld + te component.TraceExporterBase + me component.MetricsExporterBase } // Start the exporter. @@ -192,11 +193,6 @@ func (eb *ExportersBuilder) buildExporter( return nil, fmt.Errorf("exporter factory not found for type: %s", config.Type()) } - factoryV1, ok := factory.(component.ExporterFactoryOld) - if !ok { - return nil, fmt.Errorf("exporter factory must implement ExporterFactoryOld: %s", config.Type()) - } - exporter := &builtExporter{} inputDataTypes := exportersInputDataTypes[config] @@ -212,7 +208,7 @@ func (eb *ExportersBuilder) buildExporter( if requirement, ok := inputDataTypes[configmodels.TracesDataType]; ok { // Traces data type is required. Create a trace exporter based on config. - te, err := factoryV1.CreateTraceExporter(eb.logger, config) + te, err := createTraceExporter(factory, eb.logger, config) if err != nil { if err == configerror.ErrDataTypeIsNotSupported { // Could not create because this exporter does not support this data type. @@ -231,7 +227,7 @@ func (eb *ExportersBuilder) buildExporter( if requirement, ok := inputDataTypes[configmodels.MetricsDataType]; ok { // Metrics data type is required. Create a trace exporter based on config. - me, err := factoryV1.CreateMetricsExporter(eb.logger, config) + me, err := createMetricsExporter(factory, eb.logger, config) if err != nil { if err == configerror.ErrDataTypeIsNotSupported { // Could not create because this exporter does not support this data type. @@ -264,3 +260,42 @@ func typeMismatchErr( config.Name(), dataType.GetString(), ) } + +// createTraceProcessor creates a trace exporter based on provided factory type. +func createTraceExporter( + factoryBase component.ExporterFactoryBase, + logger *zap.Logger, + cfg configmodels.Exporter, +) (component.TraceExporterBase, error) { + if factory, ok := factoryBase.(component.ExporterFactory); ok { + creationParams := component.ExporterCreateParams{Logger: logger} + ctx := context.Background() + + // If exporter is of the new type (can manipulate on internal data structure), + // use ExporterFactory.CreateTraceExporter. + return factory.CreateTraceExporter(ctx, creationParams, cfg) + } + + // If exporter is of the old type (can manipulate on OC traces only), + // use ExporterFactoryOld.CreateTraceExporter. + return factoryBase.(component.ExporterFactoryOld).CreateTraceExporter(logger, cfg) +} + +// createMetricsExporter creates a metrics exporter based on provided factory type. +func createMetricsExporter(factoryBase component.ExporterFactoryBase, + logger *zap.Logger, + cfg configmodels.Exporter, +) (component.MetricsExporterBase, error) { + if factory, ok := factoryBase.(component.ExporterFactory); ok { + creationParams := component.ExporterCreateParams{Logger: logger} + ctx := context.Background() + + // If exporter is of the new type (can manipulate on internal data structure), + // use ExporterFactory.CreateMetricsExporter. + return factory.CreateMetricsExporter(ctx, creationParams, cfg) + } + + // If exporter is of the old type (can manipulate on OC metrics only), + // use ExporterFactoryOld.CreateMetricsExporter. + return factoryBase.(component.ExporterFactoryOld).CreateMetricsExporter(logger, cfg) +} diff --git a/service/builder/pipelines_builder.go b/service/builder/pipelines_builder.go index ffa8a95a415..41bd93a3713 100644 --- a/service/builder/pipelines_builder.go +++ b/service/builder/pipelines_builder.go @@ -15,6 +15,8 @@ package builder import ( + "context" + "errors" "fmt" "go.uber.org/zap" @@ -30,8 +32,8 @@ import ( // It can have a trace and/or a metrics consumer (the consumer is either the first // processor in the pipeline or the exporter if pipeline has no processors). type builtPipeline struct { - firstTC consumer.TraceConsumerOld - firstMC consumer.MetricsConsumerOld + firstTC consumer.TraceConsumerBase + firstMC consumer.MetricsConsumerBase // MutatesConsumedData is set to true if any processors in the pipeline // can mutate the TraceData or MetricsData input argument. @@ -122,8 +124,8 @@ func (pb *PipelinesBuilder) buildPipeline( // BuildProcessors the pipeline backwards. // First create a consumer junction point that fans out the data to all exporters. - var tc consumer.TraceConsumerOld - var mc consumer.MetricsConsumerOld + var tc consumer.TraceConsumerBase + var mc consumer.MetricsConsumerBase switch pipelineCfg.InputType { case configmodels.TracesDataType: @@ -146,27 +148,22 @@ func (pb *PipelinesBuilder) buildPipeline( factory := pb.factories[procCfg.Type()] - factoryV1, ok := factory.(component.ProcessorFactoryOld) - if !ok { - return nil, fmt.Errorf("processor factory must implement ExporterFactoryOld: %s", procCfg.Type()) - } - // This processor must point to the next consumer and then // it becomes the next for the previous one (previous in the pipeline, // which we will build in the next loop iteration). var err error switch pipelineCfg.InputType { case configmodels.TracesDataType: - var proc component.TraceProcessorOld - proc, err = factoryV1.CreateTraceProcessor(pb.logger, tc, procCfg) + var proc component.TraceProcessorBase + proc, err = createTraceProcessor(factory, pb.logger, procCfg, tc) if proc != nil { mutatesConsumedData = mutatesConsumedData || proc.GetCapabilities().MutatesConsumedData } processors[i] = proc tc = proc case configmodels.MetricsDataType: - var proc component.MetricsProcessorOld - proc, err = factoryV1.CreateMetricsProcessor(pb.logger, mc, procCfg) + var proc component.MetricsProcessorBase + proc, err = createMetricsProcessor(factory, pb.logger, procCfg, mc) if proc != nil { mutatesConsumedData = mutatesConsumedData || proc.GetCapabilities().MutatesConsumedData } @@ -208,7 +205,7 @@ func (pb *PipelinesBuilder) getBuiltExportersByNames(exporterNames []string) []* return result } -func (pb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []string) consumer.TraceConsumerOld { +func (pb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []string) consumer.TraceConsumerBase { builtExporters := pb.getBuiltExportersByNames(exporterNames) // Optimize for the case when there is only one exporter, no need to create junction point. @@ -216,16 +213,16 @@ func (pb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []st return builtExporters[0].te } - var exporters []consumer.TraceConsumerOld + var exporters []consumer.TraceConsumerBase for _, builtExp := range builtExporters { exporters = append(exporters, builtExp.te) } // Create a junction point that fans out to all exporters. - return processor.NewTraceFanOutConnector(exporters) + return processor.CreateTraceFanOutConnector(exporters) } -func (pb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []string) consumer.MetricsConsumerOld { +func (pb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []string) consumer.MetricsConsumerBase { builtExporters := pb.getBuiltExportersByNames(exporterNames) // Optimize for the case when there is only one exporter, no need to create junction point. @@ -233,11 +230,81 @@ func (pb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames [] return builtExporters[0].me } - var exporters []consumer.MetricsConsumerOld + var exporters []consumer.MetricsConsumerBase for _, builtExp := range builtExporters { exporters = append(exporters, builtExp.me) } // Create a junction point that fans out to all exporters. - return processor.NewMetricsFanOutConnector(exporters) + return processor.CreateMetricsFanOutConnector(exporters) +} + +// createTraceProcessor creates trace processor based on type of the current processor +// and type of the downstream consumer. +func createTraceProcessor( + factoryBase component.ProcessorFactoryBase, + logger *zap.Logger, + cfg configmodels.Processor, + nextConsumer consumer.TraceConsumerBase, +) (component.TraceProcessorBase, error) { + if factory, ok := factoryBase.(component.ProcessorFactory); ok { + creationParams := component.ProcessorCreateParams{Logger: logger} + ctx := context.Background() + + // If both processor and consumer are of the new type (can manipulate on internal data structure), + // use ProcessorFactory.CreateTraceProcessor. + if nextConsumer, ok := nextConsumer.(consumer.TraceConsumer); ok { + return factory.CreateTraceProcessor(ctx, creationParams, nextConsumer, cfg) + } + + // If processor is of the new type, but downstream consumer is of the old type, + // use internalToOCTraceConverter compatibility shim. + traceConverter := consumer.NewInternalToOCTraceConverter(nextConsumer.(consumer.TraceConsumerOld)) + return factory.CreateTraceProcessor(ctx, creationParams, traceConverter, cfg) + } + + // If both processor and consumer are of the old type (can manipulate on OC traces only), + // use ProcessorFactoryOld.CreateTraceProcessor. + if nextConsumerOld, ok := nextConsumer.(consumer.TraceConsumerOld); ok { + return factoryBase.(component.ProcessorFactoryOld).CreateTraceProcessor(logger, nextConsumerOld, cfg) + } + + // Old type processor and a new type consumer usecase is not supported. + // TODO: This case can be supported since we have OC->internal traces translation function + return nil, errors.New("OC Traces -> internal data format translation is not supported") +} + +// createMetricsProcessor creates metric processor based on type of the current processor +// and type of the downstream consumer. +func createMetricsProcessor( + factoryBase component.ProcessorFactoryBase, + logger *zap.Logger, + cfg configmodels.Processor, + nextConsumer consumer.MetricsConsumerBase, +) (component.MetricsProcessorBase, error) { + if factory, ok := factoryBase.(component.ProcessorFactory); ok { + creationParams := component.ProcessorCreateParams{Logger: logger} + ctx := context.Background() + + // If both processor and consumer are of the new type (can manipulate on internal data structure), + // use ProcessorFactory.CreateMetricsProcessor. + if nextConsumer, ok := nextConsumer.(consumer.MetricsConsumer); ok { + return factory.CreateMetricsProcessor(ctx, creationParams, nextConsumer, cfg) + } + + // If processor is of the new type, but downstream consumer is of the old type, + // use internalToOCMetricsConverter compatibility shim. + metricsConverter := consumer.NewInternalToOCMetricsConverter(nextConsumer.(consumer.MetricsConsumerOld)) + return factory.CreateMetricsProcessor(ctx, creationParams, metricsConverter, cfg) + } + + // If both processor and consumer are of the old type (can manipulate on OC metrics only), + // use ProcessorFactoryOld.CreateMetricsProcessor. + if nextConsumerOld, ok := nextConsumer.(consumer.MetricsConsumerOld); ok { + return factoryBase.(component.ProcessorFactoryOld).CreateMetricsProcessor(logger, nextConsumerOld, cfg) + } + + // Old type processor and a new type consumer usecase is not supported. + // TODO: This case can be supported once we have OC->internal metrics translation function + return nil, errors.New("OC Metrics -> internal data format translation is not supported") } diff --git a/service/builder/pipelines_builder_test.go b/service/builder/pipelines_builder_test.go index ab7123cc487..ee9a21b3ec5 100644 --- a/service/builder/pipelines_builder_test.go +++ b/service/builder/pipelines_builder_test.go @@ -142,7 +142,7 @@ func testPipeline(t *testing.T, pipelineName string, exporterNames []string) { Type: "resourcetype", }, } - processor.firstTC.ConsumeTraceData(context.Background(), traceData) + processor.firstTC.(consumer.TraceConsumerOld).ConsumeTraceData(context.Background(), traceData) // Now verify received data. for _, consumer := range exporterConsumers { diff --git a/service/builder/receivers_builder.go b/service/builder/receivers_builder.go index da242d01829..9341c56b775 100644 --- a/service/builder/receivers_builder.go +++ b/service/builder/receivers_builder.go @@ -259,13 +259,13 @@ func (rb *ReceiversBuilder) buildReceiver(config configmodels.Receiver) (*builtR return rcv, nil } -func buildFanoutTraceConsumer(pipelines []*builtPipeline) consumer.TraceConsumerOld { +func buildFanoutTraceConsumer(pipelines []*builtPipeline) consumer.TraceConsumerBase { // Optimize for the case when there is only one processor, no need to create junction point. if len(pipelines) == 1 { return pipelines[0].firstTC } - var pipelineConsumers []consumer.TraceConsumerOld + var pipelineConsumers []consumer.TraceConsumerBase anyPipelineMutatesData := false for _, pipeline := range pipelines { pipelineConsumers = append(pipelineConsumers, pipeline.firstTC) @@ -279,18 +279,18 @@ func buildFanoutTraceConsumer(pipelines []*builtPipeline) consumer.TraceConsumer // TODO: if there are more than 2 pipelines only clone data for pipelines that // declare the intent to mutate the data. Pipelines that do not mutate the data // can consume shared data. - return processor.NewTraceCloningFanOutConnector(pipelineConsumers) + return processor.CreateTraceCloningFanOutConnector(pipelineConsumers) } - return processor.NewTraceFanOutConnector(pipelineConsumers) + return processor.CreateTraceFanOutConnector(pipelineConsumers) } -func buildFanoutMetricConsumer(pipelines []*builtPipeline) consumer.MetricsConsumerOld { +func buildFanoutMetricConsumer(pipelines []*builtPipeline) consumer.MetricsConsumerBase { // Optimize for the case when there is only one processor, no need to create junction point. if len(pipelines) == 1 { return pipelines[0].firstMC } - var pipelineConsumers []consumer.MetricsConsumerOld + var pipelineConsumers []consumer.MetricsConsumerBase anyPipelineMutatesData := false for _, pipeline := range pipelines { pipelineConsumers = append(pipelineConsumers, pipeline.firstMC) @@ -304,9 +304,9 @@ func buildFanoutMetricConsumer(pipelines []*builtPipeline) consumer.MetricsConsu // TODO: if there are more than 2 pipelines only clone data for pipelines that // declare the intent to mutate the data. Pipelines that do not mutate the data // can consume shared data. - return processor.NewMetricsCloningFanOutConnector(pipelineConsumers) + return processor.CreateMetricsCloningFanOutConnector(pipelineConsumers) } - return processor.NewMetricsFanOutConnector(pipelineConsumers) + return processor.CreateMetricsFanOutConnector(pipelineConsumers) } // createTraceReceiver is a helper function that creates trace receiver based on the current receiver type @@ -316,7 +316,7 @@ func createTraceReceiver( factory component.ReceiverFactoryBase, logger *zap.Logger, cfg configmodels.Receiver, - nextConsumer consumer.BaseTraceConsumer, + nextConsumer consumer.TraceConsumerBase, ) (component.TraceReceiver, error) { if factoryV2, ok := factory.(component.ReceiverFactory); ok { creationParams := component.ReceiverCreateParams{Logger: logger} @@ -340,6 +340,7 @@ func createTraceReceiver( } // Old type receiver and a new type consumer usecase is not supported. + // TODO: This case can be supported since we have OC->internal traces translation function return nil, errors.New("OC Traces -> internal data format translation is not supported") } @@ -350,7 +351,7 @@ func createMetricsReceiver( factory component.ReceiverFactoryBase, logger *zap.Logger, cfg configmodels.Receiver, - nextConsumer consumer.BaseMetricsConsumer, + nextConsumer consumer.MetricsConsumerBase, ) (component.MetricsReceiver, error) { if factoryV2, ok := factory.(component.ReceiverFactory); ok { creationParams := component.ReceiverCreateParams{Logger: logger} @@ -374,5 +375,6 @@ func createMetricsReceiver( } // Old type receiver and a new type consumer usecase is not supported. + // TODO: This case can be supported once we have OC->internal metrics translation function return nil, errors.New("OC Metrics -> internal data format translation is not supported") }