Skip to content

Commit

Permalink
[chore] [exporterhelper] Remove duplicated code between exporter help…
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Feb 20, 2024
1 parent 0ab8f44 commit e7770fc
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 165 deletions.
38 changes: 25 additions & 13 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func WithRetry(config configretry.BackOffConfig) Option {
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config QueueSettings) Option {
return func(o *baseExporter) {
if o.requestExporter {
if o.marshaler == nil || o.unmarshaler == nil {
panic("WithQueue option is not available for the new request exporters, use WithRequestQueue instead")
}
if !config.Enabled {
Expand Down Expand Up @@ -114,6 +114,9 @@ func WithQueue(config QueueSettings) Option {
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[Request]) Option {
return func(o *baseExporter) {
if o.marshaler != nil || o.unmarshaler != nil {
panic("WithRequestQueue option must be used with the new request exporters only, use WithQueue instead")
}
if !cfg.Enabled {
o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return
Expand All @@ -135,15 +138,30 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
}
}

// withMarshaler is used to set the request marshaler for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func withMarshaler(marshaler exporterqueue.Marshaler[Request]) Option {
return func(o *baseExporter) {
o.marshaler = marshaler
}
}

// withUnmarshaler is used to set the request unmarshaler for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func withUnmarshaler(unmarshaler exporterqueue.Unmarshaler[Request]) Option {
return func(o *baseExporter) {
o.unmarshaler = unmarshaler
}
}

// baseExporter contains common fields between different exporter types.
type baseExporter struct {
component.StartFunc
component.ShutdownFunc

requestExporter bool
marshaler exporterqueue.Marshaler[Request]
unmarshaler exporterqueue.Unmarshaler[Request]
signal component.DataType
marshaler exporterqueue.Marshaler[Request]
unmarshaler exporterqueue.Unmarshaler[Request]
signal component.DataType

set exporter.CreateSettings
obsrep *ObsReport
Expand All @@ -162,20 +180,14 @@ type baseExporter struct {
consumerOptions []consumer.Option
}

// TODO: requestExporter, marshaler, and unmarshaler arguments can be removed when the old exporter helpers will be updated to call the new ones.
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool,
marshaler exporterqueue.Marshaler[Request], unmarshaler exporterqueue.Unmarshaler[Request], osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {

func newBaseExporter(set exporter.CreateSettings, signal component.DataType, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {
obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set})
if err != nil {
return nil, err
}

be := &baseExporter{
requestExporter: requestExporter,
marshaler: marshaler,
unmarshaler: unmarshaler,
signal: signal,
signal: signal,

queueSender: &baseRequestSender{},
obsrepSender: osf(obsReport),
Expand Down
27 changes: 15 additions & 12 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/exportertest"
)

Expand All @@ -36,11 +37,7 @@ func newNoopObsrepSender(*ObsReport) requestSender {
}

func TestBaseExporter(t *testing.T) {
be, err := newBaseExporter(defaultSettings, defaultType, false, nil, nil, newNoopObsrepSender)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
be, err = newBaseExporter(defaultSettings, defaultType, true, nil, nil, newNoopObsrepSender)
be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
Expand All @@ -49,7 +46,7 @@ func TestBaseExporter(t *testing.T) {
func TestBaseExporterWithOptions(t *testing.T) {
want := errors.New("my error")
be, err := newBaseExporter(
defaultSettings, defaultType, false, nil, nil, newNoopObsrepSender,
defaultSettings, defaultType, newNoopObsrepSender,
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutSettings()),
Expand All @@ -68,15 +65,22 @@ func checkStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) {
}
}

func TestQueueRetryOptionsWithRequestExporter(t *testing.T) {
bs, err := newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, true, nil, nil, newNoopObsrepSender,
func TestQueueOptionsWithRequestExporter(t *testing.T) {
bs, err := newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, newNoopObsrepSender,
WithRetry(configretry.NewDefaultBackOffConfig()))
require.Nil(t, err)
require.True(t, bs.requestExporter)
require.Nil(t, bs.marshaler)
require.Nil(t, bs.unmarshaler)
require.Panics(t, func() {
_, _ = newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, true, nil, nil, newNoopObsrepSender,
_, _ = newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, newNoopObsrepSender,
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueSettings()))
})
require.Panics(t, func() {
_, _ = newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, newNoopObsrepSender,
withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
WithRetry(configretry.NewDefaultBackOffConfig()),
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]()))
})
}

func TestBaseExporterLogging(t *testing.T) {
Expand All @@ -85,9 +89,8 @@ func TestBaseExporterLogging(t *testing.T) {
set.Logger = zap.New(logger)
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.Enabled = false
bs, err := newBaseExporter(set, defaultType, true, nil, nil, newNoopObsrepSender, WithRetry(rCfg))
bs, err := newBaseExporter(set, defaultType, newNoopObsrepSender, WithRetry(rCfg))
require.Nil(t, err)
require.True(t, bs.requestExporter)
sendErr := bs.send(context.Background(), newErrorRequest())
require.Error(t, sendErr)

Expand Down
38 changes: 11 additions & 27 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type logsExporter struct {

// NewLogsExporter creates an exporter.Logs that records observability metrics and wraps every request with a Span.
func NewLogsExporter(
_ context.Context,
ctx context.Context,
set exporter.CreateSettings,
cfg component.Config,
pusher consumer.ConsumeLogsFunc,
Expand All @@ -79,41 +79,25 @@ func NewLogsExporter(
if cfg == nil {
return nil, errNilConfig
}

if set.Logger == nil {
return nil, errNilLogger
}

if pusher == nil {
return nil, errNilPushLogsData
}

be, err := newBaseExporter(set, component.DataTypeLogs, false, logsRequestMarshaler,
newLogsRequestUnmarshalerFunc(pusher), newLogsExporterWithObservability, options...)
if err != nil {
return nil, err
}

lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
req := newLogsRequest(ld, pusher)
serr := be.send(ctx, req)
if errors.Is(serr, queue.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount()))
}
return serr
}, be.consumerOptions...)

return &logsExporter{
baseExporter: be,
Logs: lc,
}, err
logsOpts := []Option{withMarshaler(logsRequestMarshaler), withUnmarshaler(newLogsRequestUnmarshalerFunc(pusher))}
return NewLogsRequestExporter(ctx, set, requestFromLogs(pusher), append(logsOpts, options...)...)
}

// RequestFromLogsFunc converts plog.Logs data into a user-defined request.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type RequestFromLogsFunc func(context.Context, plog.Logs) (Request, error)

// requestFromLogs returns a RequestFromLogsFunc that converts plog.Logs into a Request.
func requestFromLogs(pusher consumer.ConsumeLogsFunc) RequestFromLogsFunc {
return func(_ context.Context, ld plog.Logs) (Request, error) {
return newLogsRequest(ld, pusher), nil
}
}

// NewLogsRequestExporter creates new logs exporter based on custom LogsConverter and RequestSender.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
Expand All @@ -131,7 +115,7 @@ func NewLogsRequestExporter(
return nil, errNilLogsConverter
}

be, err := newBaseExporter(set, component.DataTypeLogs, true, nil, nil, newLogsExporterWithObservability, options...)
be, err := newBaseExporter(set, component.DataTypeLogs, newLogsExporterWithObservability, options...)
if err != nil {
return nil, err
}
Expand Down
38 changes: 11 additions & 27 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type metricsExporter struct {

// NewMetricsExporter creates an exporter.Metrics that records observability metrics and wraps every request with a Span.
func NewMetricsExporter(
_ context.Context,
ctx context.Context,
set exporter.CreateSettings,
cfg component.Config,
pusher consumer.ConsumeMetricsFunc,
Expand All @@ -79,41 +79,25 @@ func NewMetricsExporter(
if cfg == nil {
return nil, errNilConfig
}

if set.Logger == nil {
return nil, errNilLogger
}

if pusher == nil {
return nil, errNilPushMetricsData
}

be, err := newBaseExporter(set, component.DataTypeMetrics, false, metricsRequestMarshaler,
newMetricsRequestUnmarshalerFunc(pusher), newMetricsSenderWithObservability, options...)
if err != nil {
return nil, err
}

mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
req := newMetricsRequest(md, pusher)
serr := be.send(ctx, req)
if errors.Is(serr, queue.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeMetrics, int64(req.ItemsCount()))
}
return serr
}, be.consumerOptions...)

return &metricsExporter{
baseExporter: be,
Metrics: mc,
}, err
metricsOpts := []Option{withMarshaler(metricsRequestMarshaler), withUnmarshaler(newMetricsRequestUnmarshalerFunc(pusher))}
return NewMetricsRequestExporter(ctx, set, requestFromMetrics(pusher), append(metricsOpts, options...)...)
}

// RequestFromMetricsFunc converts pdata.Metrics into a user-defined request.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type RequestFromMetricsFunc func(context.Context, pmetric.Metrics) (Request, error)

// requestFromMetrics returns a RequestFromMetricsFunc that converts pdata.Metrics into a Request.
func requestFromMetrics(pusher consumer.ConsumeMetricsFunc) RequestFromMetricsFunc {
return func(_ context.Context, md pmetric.Metrics) (Request, error) {
return newMetricsRequest(md, pusher), nil
}
}

// NewMetricsRequestExporter creates a new metrics exporter based on a custom MetricsConverter and RequestSender.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
Expand All @@ -131,7 +115,7 @@ func NewMetricsRequestExporter(
return nil, errNilMetricsConverter
}

be, err := newBaseExporter(set, component.DataTypeMetrics, true, nil, nil, newMetricsSenderWithObservability, options...)
be, err := newBaseExporter(set, component.DataTypeMetrics, newMetricsSenderWithObservability, options...)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit e7770fc

Please sign in to comment.