Skip to content

Commit

Permalink
Add internal data -> OC compatibility shims to processors and exporte…
Browse files Browse the repository at this point in the history
…rs (#667)

In order to start migrating components from OC to internal data, all parts of the pipeline should be able to automatically convert back to OC if downstream component doesn't work with internal data structure
  • Loading branch information
dmitryax authored Mar 26, 2020
1 parent 90932e1 commit 7a85f6f
Show file tree
Hide file tree
Showing 12 changed files with 452 additions and 96 deletions.
16 changes: 13 additions & 3 deletions component/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,38 @@ type Exporter interface {
Component
}

// TraceExporterBase defines a common interface for TraceExporter and TraceExporterOld
type TraceExporterBase interface {
Exporter
}

// TraceExporterOld is a TraceConsumer that is also an Exporter.
type TraceExporterOld interface {
consumer.TraceConsumerOld
Exporter
TraceExporterBase
}

// TraceExporter is an TraceConsumer that is also an Exporter.
type TraceExporter interface {
consumer.TraceConsumer
TraceExporterBase
}

// MetricsExporterBase defines a common interface for MetricsExporter and MetricsExporterOld
type MetricsExporterBase interface {
Exporter
}

// MetricsExporterOld is a MetricsConsumer that is also an Exporter.
type MetricsExporterOld interface {
consumer.MetricsConsumerOld
Exporter
MetricsExporterBase
}

// MetricsExporter is a MetricsConsumer that is also an Exporter.
type MetricsExporter interface {
consumer.MetricsConsumer
Exporter
MetricsExporterBase
}

// ExporterFactoryBase defines the common functions for all exporter factories.
Expand Down
26 changes: 18 additions & 8 deletions component/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,38 @@ type Processor interface {
GetCapabilities() ProcessorCapabilities
}

// TraceProcessorOld composes TraceConsumer with some additional processor-specific functions.
type TraceProcessorOld interface {
consumer.TraceConsumerOld
// TraceProcessorBase is a common interface for TraceProcessor and TraceProcessorOld
type TraceProcessorBase interface {
Processor
}

// MetricsProcessor composes MetricsConsumer with some additional processor-specific functions.
type MetricsProcessorOld interface {
consumer.MetricsConsumerOld
Processor
// TraceProcessorOld composes TraceConsumer with some additional processor-specific functions.
type TraceProcessorOld interface {
consumer.TraceConsumerOld
TraceProcessorBase
}

// TraceProcessor composes TraceConsumer with some additional processor-specific functions.
type TraceProcessor interface {
consumer.TraceConsumer
TraceProcessorBase
}

// MetricsProcessorBase is a common interface for MetricsProcessor and MetricsProcessorV2
type MetricsProcessorBase interface {
Processor
}

// MetricsProcessor composes MetricsConsumer with some additional processor-specific functions.
type MetricsProcessorOld interface {
consumer.MetricsConsumerOld
MetricsProcessorBase
}

// MetricsProcessor composes MetricsConsumer with some additional processor-specific functions.
type MetricsProcessor interface {
consumer.MetricsConsumer
Processor
MetricsProcessorBase
}

// ProcessorCapabilities describes the capabilities of a Processor.
Expand Down
16 changes: 8 additions & 8 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,41 +22,41 @@ import (
"github.com/open-telemetry/opentelemetry-collector/internal/data"
)

// BaseMetricsConsumer defines a common interface for MetricsConsumerOld and MetricsConsumer.
type BaseMetricsConsumer interface{}
// MetricsConsumerBase defines a common interface for MetricsConsumerOld and MetricsConsumer.
type MetricsConsumerBase interface{}

// MetricsConsumerOld is an interface that receives consumerdata.MetricsData, process it as needed, and
// sends it to the next processing node if any or to the destination.
//
// ConsumeMetricsData receives consumerdata.MetricsData for processing by the MetricsConsumer.
type MetricsConsumerOld interface {
BaseMetricsConsumer
MetricsConsumerBase
ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error
}

// MetricsConsumer is the new metrics consumer interface that receives data.MetricData, processes it
// as needed, and sends it to the next processing node if any or to the destination.
type MetricsConsumer interface {
BaseMetricsConsumer
MetricsConsumerBase
ConsumeMetrics(ctx context.Context, md data.MetricData) error
}

// BaseTraceConsumer defines a common interface for TraceConsumerOld and TraceConsumer.
type BaseTraceConsumer interface{}
// TraceConsumerBase defines a common interface for TraceConsumerOld and TraceConsumer.
type TraceConsumerBase interface{}

// TraceConsumerOld is an interface that receives consumerdata.TraceData, process it as needed, and
// sends it to the next processing node if any or to the destination.
//
// ConsumeTraceData receives consumerdata.TraceData for processing by the TraceConsumer.
type TraceConsumerOld interface {
BaseTraceConsumer
TraceConsumerBase
ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error
}

// TraceConsumer is an interface that receives data.TraceData, processes it
// as needed, and sends it to the next processing node if any or to the destination.
type TraceConsumer interface {
BaseTraceConsumer
TraceConsumerBase
// ConsumeTrace receives data.TraceData for processing.
ConsumeTrace(ctx context.Context, td data.TraceData) error
}
4 changes: 2 additions & 2 deletions exporter/zipkinexporter/zipkin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) {
mzr := newMockZipkinReporter(cst.URL)

// Run the Zipkin receiver to "receive spans upload from a client application"
zexp := processor.NewTraceFanOutConnector([]consumer.TraceConsumerOld{tes})
zexp := processor.NewTraceFanOutConnectorOld([]consumer.TraceConsumerOld{tes})
addr := testutils.GetAvailableLocalAddress(t)
zi, err := zipkinreceiver.New("zipkin_receiver", addr, zexp)
assert.NoError(t, err)
Expand Down Expand Up @@ -300,7 +300,7 @@ func TestZipkinExporter_roundtripProto(t *testing.T) {
mzr.serializer = zipkinproto.SpanSerializer{}

// Run the Zipkin receiver to "receive spans upload from a client application"
zexp := processor.NewTraceFanOutConnector([]consumer.TraceConsumerOld{tes})
zexp := processor.NewTraceFanOutConnectorOld([]consumer.TraceConsumerOld{tes})
port := testutils.GetAvailablePort(t)
zi, err := zipkinreceiver.New(
"zipkin_receiver", fmt.Sprintf(":%d", port), zexp)
Expand Down
32 changes: 32 additions & 0 deletions processor/cloningfanoutconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ import (
// clones of data before fanning out, which ensures each consumer gets their
// own copy of data and is free to modify it.

// CreateMetricsCloningFanOutConnector is a placeholder function for now.
// It supposed to create an old type connector or a new type connector based on type of provided metrics consumer.
func CreateMetricsCloningFanOutConnector(baseMetricsConsumers []consumer.MetricsConsumerBase) consumer.MetricsConsumerOld {
// TODO: CreateMetricsCloningFanOutConnector doesn't support new type of consumers
// until internal data structure provides Clone method.
metricsConsumers := make([]consumer.MetricsConsumerOld, 0, len(baseMetricsConsumers))
for _, baseMetricsConsumer := range baseMetricsConsumers {
metricsConsumer, ok := baseMetricsConsumer.(consumer.MetricsConsumerOld)
if !ok {
panic("CreateMetricsCloningFanOutConnector does not support new type of MetricsConsumer")
}
metricsConsumers = append(metricsConsumers, metricsConsumer)
}
return NewMetricsCloningFanOutConnector(metricsConsumers)
}

// NewMetricsCloningFanOutConnector wraps multiple metrics consumers in a single one.
func NewMetricsCloningFanOutConnector(mcs []consumer.MetricsConsumerOld) consumer.MetricsConsumerOld {
return metricsCloningFanOutConnector(mcs)
Expand Down Expand Up @@ -66,6 +82,22 @@ func (mfc metricsCloningFanOutConnector) ConsumeMetricsData(ctx context.Context,
return oterr.CombineErrors(errs)
}

// CreateTraceCloningFanOutConnector is a placeholder function for now.
// It supposed to create an old type connector or a new type connector based on type of provided trace consumer.
func CreateTraceCloningFanOutConnector(baseTraceConsumers []consumer.TraceConsumerBase) consumer.TraceConsumerOld {
// TODO: CreateTraceCloningFanOutConnector doesn't support new type of consumers
// until internal data structure provides Clone functionality
traceConsumers := make([]consumer.TraceConsumerOld, 0, len(baseTraceConsumers))
for _, baseTraceConsumer := range baseTraceConsumers {
traceConsumer, ok := baseTraceConsumer.(consumer.TraceConsumerOld)
if !ok {
panic("CreateTraceCloningFanOutConnector does not support new type of TraceConsumer")
}
traceConsumers = append(traceConsumers, traceConsumer)
}
return NewTraceCloningFanOutConnector(traceConsumers)
}

// NewTraceCloningFanOutConnector wraps multiple trace consumers in a single one.
func NewTraceCloningFanOutConnector(tcs []consumer.TraceConsumerOld) consumer.TraceConsumerOld {
return traceCloningFanOutConnector(tcs)
Expand Down
8 changes: 4 additions & 4 deletions processor/cloningfanoutconnector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
func TestTraceProcessorCloningMultiplexing(t *testing.T) {
processors := make([]consumer.TraceConsumerOld, 3)
for i := range processors {
processors[i] = &mockTraceConsumer{}
processors[i] = &mockTraceConsumerOld{}
}

tfc := NewTraceCloningFanOutConnector(processors)
Expand All @@ -55,7 +55,7 @@ func TestTraceProcessorCloningMultiplexing(t *testing.T) {
}

for i, p := range processors {
m := p.(*mockTraceConsumer)
m := p.(*mockTraceConsumerOld)
assert.Equal(t, wantSpansCount, m.TotalSpans)
if i < len(processors)-1 {
assert.True(t, td.Resource != m.Traces[0].Resource)
Expand All @@ -69,7 +69,7 @@ func TestTraceProcessorCloningMultiplexing(t *testing.T) {
func TestMetricsProcessorCloningMultiplexing(t *testing.T) {
processors := make([]consumer.MetricsConsumerOld, 3)
for i := range processors {
processors[i] = &mockMetricsConsumer{}
processors[i] = &mockMetricsConsumerOld{}
}

mfc := NewMetricsCloningFanOutConnector(processors)
Expand All @@ -91,7 +91,7 @@ func TestMetricsProcessorCloningMultiplexing(t *testing.T) {
}

for i, p := range processors {
m := p.(*mockMetricsConsumer)
m := p.(*mockMetricsConsumerOld)
assert.Equal(t, wantMetricsCount, m.TotalMetrics)
if i < len(processors)-1 {
assert.True(t, md.Resource != m.Metrics[0].Resource)
Expand Down
113 changes: 101 additions & 12 deletions processor/fanoutconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,48 @@ import (

"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/internal/data"
"github.com/open-telemetry/opentelemetry-collector/oterr"
)

// This file contains implementations of Trace/Metrics connectors
// that fan out the data to multiple other consumers.

// NewMetricsFanOutConnector wraps multiple metrics consumers in a single one.
func NewMetricsFanOutConnector(mcs []consumer.MetricsConsumerOld) consumer.MetricsConsumerOld {
return metricsFanOutConnector(mcs)
// CreateMetricsFanOutConnector creates a connector based on provided type of trace consumer.
// If any of the wrapped metrics consumers are of the new type, use metricsFanOutConnector,
// otherwise use the old type connector.
func CreateMetricsFanOutConnector(mcs []consumer.MetricsConsumerBase) consumer.MetricsConsumerBase {
metricsConsumersOld := make([]consumer.MetricsConsumerOld, 0, len(mcs))
metricsConsumers := make([]consumer.MetricsConsumer, 0, len(mcs))
allMetricsConsumersOld := true
for _, mc := range mcs {
if metricsConsumer, ok := mc.(consumer.MetricsConsumer); ok {
allMetricsConsumersOld = false
metricsConsumers = append(metricsConsumers, metricsConsumer)
} else {
metricsConsumerOld := mc.(consumer.MetricsConsumerOld)
metricsConsumersOld = append(metricsConsumersOld, metricsConsumerOld)
metricsConsumers = append(metricsConsumers, consumer.NewInternalToOCMetricsConverter(metricsConsumerOld))
}
}

if allMetricsConsumersOld {
return NewMetricsFanOutConnectorOld(metricsConsumersOld)
}
return NewMetricsFanOutConnector(metricsConsumers)
}

type metricsFanOutConnector []consumer.MetricsConsumerOld
// NewMetricsFanOutConnectorOld wraps multiple metrics consumers in a single one.
func NewMetricsFanOutConnectorOld(mcs []consumer.MetricsConsumerOld) consumer.MetricsConsumerOld {
return metricsFanOutConnectorOld(mcs)
}

var _ consumer.MetricsConsumerOld = (*metricsFanOutConnector)(nil)
type metricsFanOutConnectorOld []consumer.MetricsConsumerOld

var _ consumer.MetricsConsumerOld = (*metricsFanOutConnectorOld)(nil)

// ConsumeMetricsData exports the MetricsData to all consumers wrapped by the current one.
func (mfc metricsFanOutConnector) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
func (mfc metricsFanOutConnectorOld) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
var errs []error
for _, mc := range mfc {
if err := mc.ConsumeMetricsData(ctx, md); err != nil {
Expand All @@ -45,17 +70,61 @@ func (mfc metricsFanOutConnector) ConsumeMetricsData(ctx context.Context, md con
return oterr.CombineErrors(errs)
}

// NewTraceFanOutConnector wraps multiple trace consumers in a single one.
func NewTraceFanOutConnector(tcs []consumer.TraceConsumerOld) consumer.TraceConsumerOld {
return traceFanOutConnector(tcs)
// NewMetricsFanOutConnector wraps multiple new type metrics consumers in a single one.
func NewMetricsFanOutConnector(mcs []consumer.MetricsConsumer) consumer.MetricsConsumer {
return metricsFanOutConnector(mcs)
}

type traceFanOutConnector []consumer.TraceConsumerOld
type metricsFanOutConnector []consumer.MetricsConsumer

var _ consumer.TraceConsumerOld = (*traceFanOutConnector)(nil)
var _ consumer.MetricsConsumer = (*metricsFanOutConnector)(nil)

// ConsumeMetricsData exports the MetricsData to all consumers wrapped by the current one.
func (mfc metricsFanOutConnector) ConsumeMetrics(ctx context.Context, md data.MetricData) error {
var errs []error
for _, mc := range mfc {
if err := mc.ConsumeMetrics(ctx, md); err != nil {
errs = append(errs, err)
}
}
return oterr.CombineErrors(errs)
}

// CreateTraceFanOutConnector wraps multiple trace consumers in a single one.
// If any of the wrapped trace consumers are of the new type, use traceFanOutConnector,
// otherwise use the old type connector
func CreateTraceFanOutConnector(tcs []consumer.TraceConsumerBase) consumer.TraceConsumerBase {
traceConsumersOld := make([]consumer.TraceConsumerOld, 0, len(tcs))
traceConsumers := make([]consumer.TraceConsumer, 0, len(tcs))
allTraceConsumersOld := true
for _, tc := range tcs {
if traceConsumer, ok := tc.(consumer.TraceConsumer); ok {
allTraceConsumersOld = false
traceConsumers = append(traceConsumers, traceConsumer)
} else {
traceConsumerOld := tc.(consumer.TraceConsumerOld)
traceConsumersOld = append(traceConsumersOld, traceConsumerOld)
traceConsumers = append(traceConsumers, consumer.NewInternalToOCTraceConverter(traceConsumerOld))
}
}

if allTraceConsumersOld {
return NewTraceFanOutConnectorOld(traceConsumersOld)
}
return NewTraceFanOutConnector(traceConsumers)
}

// NewTraceFanOutConnectorOld wraps multiple trace consumers in a single one.
func NewTraceFanOutConnectorOld(tcs []consumer.TraceConsumerOld) consumer.TraceConsumerOld {
return traceFanOutConnectorOld(tcs)
}

type traceFanOutConnectorOld []consumer.TraceConsumerOld

var _ consumer.TraceConsumerOld = (*traceFanOutConnectorOld)(nil)

// ConsumeTraceData exports the span data to all trace consumers wrapped by the current one.
func (tfc traceFanOutConnector) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
func (tfc traceFanOutConnectorOld) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
var errs []error
for _, tc := range tfc {
if err := tc.ConsumeTraceData(ctx, td); err != nil {
Expand All @@ -64,3 +133,23 @@ func (tfc traceFanOutConnector) ConsumeTraceData(ctx context.Context, td consume
}
return oterr.CombineErrors(errs)
}

// NewTraceFanOutConnector wraps multiple new type trace consumers in a single one.
func NewTraceFanOutConnector(tcs []consumer.TraceConsumer) consumer.TraceConsumer {
return traceFanOutConnector(tcs)
}

type traceFanOutConnector []consumer.TraceConsumer

var _ consumer.TraceConsumer = (*traceFanOutConnector)(nil)

// ConsumeTrace exports the span data to all trace consumers wrapped by the current one.
func (tfc traceFanOutConnector) ConsumeTrace(ctx context.Context, td data.TraceData) error {
var errs []error
for _, tc := range tfc {
if err := tc.ConsumeTrace(ctx, td); err != nil {
errs = append(errs, err)
}
}
return oterr.CombineErrors(errs)
}
Loading

0 comments on commit 7a85f6f

Please sign in to comment.