Skip to content

Commit

Permalink
[exporterhelper] Add collector observability to the new exporter helper
Browse files Browse the repository at this point in the history
This change adds collector's internal metrics and tracing to the new request-based exporter helpers. Only those metrics and traces are added that already adopted by the existing exporter helpers for backward compatibility. The new exporter helpers can and should expose more metrics, e.g. for tracking converter errors.
  • Loading branch information
dmitryax committed Aug 17, 2023
1 parent 4bc4b63 commit a29e672
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 12 deletions.
17 changes: 13 additions & 4 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,12 @@ func NewLogsRequestExporter(
if err != nil {
return nil, err
}

// TODO: Add new observability tracing/metrics to the new exporterhelper.
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &logsExporterWithObservability{
obsrep: be.obsrep,
nextSender: nextSender,
}
})

lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
req, cErr := converter.RequestFromLogs(ctx, ld)
Expand All @@ -155,10 +159,15 @@ func NewLogsRequestExporter(
zap.Error(err))
return consumererror.NewPermanent(cErr)
}
return be.sender.send(&request{
r := &request{
baseRequest: baseRequest{ctx: ctx},
Request: req,
})
}
sErr := be.sender.send(r)
if errors.Is(sErr, errSendingQueueIsFull) {
be.obsrep.recordLogsEnqueueFailure(r.Context(), int64(r.Count()))
}
return sErr
}, bs.consumerOptions...)

return &logsExporter{
Expand Down
53 changes: 53 additions & 0 deletions exporter/exporterhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,18 @@ func TestLogsExporter_WithRecordMetrics(t *testing.T) {
checkRecordedMetricsForLogsExporter(t, tt, le, nil)
}

func TestLogsRequestExporter_WithRecordMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

le, err := NewLogsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{})
require.NoError(t, err)
require.NotNil(t, le)

checkRecordedMetricsForLogsExporter(t, tt, le, nil)
}

func TestLogsExporter_WithRecordMetrics_ReturnError(t *testing.T) {
want := errors.New("my_error")
tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName)
Expand All @@ -173,6 +185,20 @@ func TestLogsExporter_WithRecordMetrics_ReturnError(t *testing.T) {
checkRecordedMetricsForLogsExporter(t, tt, le, want)
}

func TestLogsRequestExporter_WithRecordMetrics_ExportError(t *testing.T) {
want := errors.New("export_error")
tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

le, err := NewLogsRequestExporter(context.Background(), tt.ToExporterCreateSettings(),
&fakeRequestConverter{requestError: want})
require.Nil(t, err)
require.NotNil(t, le)

checkRecordedMetricsForLogsExporter(t, tt, le, want)
}

func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName)
require.NoError(t, err)
Expand Down Expand Up @@ -211,6 +237,19 @@ func TestLogsExporter_WithSpan(t *testing.T) {
checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, nil, 1)
}

func TestLogsRequestExporter_WithSpan(t *testing.T) {
set := exportertest.NewNopCreateSettings()
sr := new(tracetest.SpanRecorder)
set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

le, err := NewLogsRequestExporter(context.Background(), set, &fakeRequestConverter{})
require.Nil(t, err)
require.NotNil(t, le)
checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, nil, 1)
}

func TestLogsExporter_WithSpan_ReturnError(t *testing.T) {
set := exportertest.NewNopCreateSettings()
sr := new(tracetest.SpanRecorder)
Expand All @@ -225,6 +264,20 @@ func TestLogsExporter_WithSpan_ReturnError(t *testing.T) {
checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, want, 1)
}

func TestLogsRequestExporter_WithSpan_ReturnError(t *testing.T) {
set := exportertest.NewNopCreateSettings()
sr := new(tracetest.SpanRecorder)
set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

want := errors.New("my_error")
le, err := NewLogsRequestExporter(context.Background(), set, &fakeRequestConverter{requestError: want})
require.Nil(t, err)
require.NotNil(t, le)
checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, want, 1)
}

func TestLogsExporter_WithShutdown(t *testing.T) {
shutdownCalled := false
shutdown := func(context.Context) error { shutdownCalled = true; return nil }
Expand Down
17 changes: 13 additions & 4 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,12 @@ func NewMetricsRequestExporter(
if err != nil {
return nil, err
}

// TODO: Add new observability tracing/metrics to the new exporterhelper.
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &metricsSenderWithObservability{
obsrep: be.obsrep,
nextSender: nextSender,
}
})

mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
req, cErr := converter.RequestFromMetrics(ctx, md)
Expand All @@ -155,10 +159,15 @@ func NewMetricsRequestExporter(
zap.Error(err))
return consumererror.NewPermanent(cErr)
}
return be.sender.send(&request{
r := &request{
Request: req,
baseRequest: baseRequest{ctx: ctx},
})
}
sErr := be.sender.send(r)
if errors.Is(sErr, errSendingQueueIsFull) {
be.obsrep.recordMetricsEnqueueFailure(r.Context(), int64(r.Count()))
}
return sErr
}, bs.consumerOptions...)

return &metricsExporter{
Expand Down
52 changes: 52 additions & 0 deletions exporter/exporterhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,18 @@ func TestMetricsExporter_WithRecordMetrics(t *testing.T) {
checkRecordedMetricsForMetricsExporter(t, tt, me, nil)
}

func TestMetricsRequestExporter_WithRecordMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

me, err := NewMetricsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), fakeRequestConverter{})
require.NoError(t, err)
require.NotNil(t, me)

checkRecordedMetricsForMetricsExporter(t, tt, me, nil)
}

func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) {
want := errors.New("my_error")
tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName)
Expand All @@ -174,6 +186,19 @@ func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) {
checkRecordedMetricsForMetricsExporter(t, tt, me, want)
}

func TestMetricsRequestExporter_WithRecordMetrics_ExportError(t *testing.T) {
want := errors.New("my_error")
tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

me, err := NewMetricsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), fakeRequestConverter{requestError: want})
require.NoError(t, err)
require.NotNil(t, me)

checkRecordedMetricsForMetricsExporter(t, tt, me, want)
}

func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName)
require.NoError(t, err)
Expand Down Expand Up @@ -212,6 +237,19 @@ func TestMetricsExporter_WithSpan(t *testing.T) {
checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, nil, 2)
}

func TestMetricsRequestExporter_WithSpan(t *testing.T) {
set := exportertest.NewNopCreateSettings()
sr := new(tracetest.SpanRecorder)
set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

me, err := NewMetricsRequestExporter(context.Background(), set, fakeRequestConverter{})
require.NoError(t, err)
require.NotNil(t, me)
checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, nil, 2)
}

func TestMetricsExporter_WithSpan_ReturnError(t *testing.T) {
set := exportertest.NewNopCreateSettings()
sr := new(tracetest.SpanRecorder)
Expand All @@ -226,6 +264,20 @@ func TestMetricsExporter_WithSpan_ReturnError(t *testing.T) {
checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, want, 2)
}

func TestMetricsRequestExporter_WithSpan_ExportError(t *testing.T) {
set := exportertest.NewNopCreateSettings()
sr := new(tracetest.SpanRecorder)
set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

want := errors.New("my_error")
me, err := NewMetricsRequestExporter(context.Background(), set, fakeRequestConverter{requestError: want})
require.NoError(t, err)
require.NotNil(t, me)
checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, want, 2)
}

func TestMetricsExporter_WithShutdown(t *testing.T) {
shutdownCalled := false
shutdown := func(context.Context) error { shutdownCalled = true; return nil }
Expand Down
17 changes: 13 additions & 4 deletions exporter/exporterhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,12 @@ func NewTracesRequestExporter(
if err != nil {
return nil, err
}

// TODO: Add new observability tracing/metrics to the new exporterhelper.
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &tracesExporterWithObservability{
obsrep: be.obsrep,
nextSender: nextSender,
}
})

tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error {
req, cErr := converter.RequestFromTraces(ctx, td)
Expand All @@ -155,10 +159,15 @@ func NewTracesRequestExporter(
zap.Error(err))
return consumererror.NewPermanent(cErr)
}
return be.sender.send(&request{
r := &request{
baseRequest: baseRequest{ctx: ctx},
Request: req,
})
}
sErr := be.sender.send(r)
if errors.Is(sErr, errSendingQueueIsFull) {
be.obsrep.recordTracesEnqueueFailure(r.Context(), int64(r.Count()))
}
return sErr
}, bs.consumerOptions...)

return &traceExporter{
Expand Down
54 changes: 54 additions & 0 deletions exporter/exporterhelper/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,18 @@ func TestTracesExporter_WithRecordMetrics(t *testing.T) {
checkRecordedMetricsForTracesExporter(t, tt, te, nil)
}

func TestTracesRequestExporter_WithRecordMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := NewTracesRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{})
require.NoError(t, err)
require.NotNil(t, te)

checkRecordedMetricsForTracesExporter(t, tt, te, nil)
}

func TestTracesExporter_WithRecordMetrics_ReturnError(t *testing.T) {
want := errors.New("my_error")
tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName)
Expand All @@ -171,6 +183,19 @@ func TestTracesExporter_WithRecordMetrics_ReturnError(t *testing.T) {
checkRecordedMetricsForTracesExporter(t, tt, te, want)
}

func TestTracesRequestExporter_WithRecordMetrics_RequestSenderError(t *testing.T) {
want := errors.New("export_error")
tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := NewTracesRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{requestError: want})
require.NoError(t, err)
require.NotNil(t, te)

checkRecordedMetricsForTracesExporter(t, tt, te, want)
}

func TestTracesExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName)
require.NoError(t, err)
Expand Down Expand Up @@ -210,6 +235,20 @@ func TestTracesExporter_WithSpan(t *testing.T) {
checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, nil, 1)
}

func TestTracesRequestExporter_WithSpan(t *testing.T) {
set := exportertest.NewNopCreateSettings()
sr := new(tracetest.SpanRecorder)
set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

te, err := NewTracesRequestExporter(context.Background(), set, &fakeRequestConverter{})
require.NoError(t, err)
require.NotNil(t, te)

checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, nil, 1)
}

func TestTracesExporter_WithSpan_ReturnError(t *testing.T) {
set := exportertest.NewNopCreateSettings()
sr := new(tracetest.SpanRecorder)
Expand All @@ -225,6 +264,21 @@ func TestTracesExporter_WithSpan_ReturnError(t *testing.T) {
checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, want, 1)
}

func TestTracesRequestExporter_WithSpan_ExportError(t *testing.T) {
set := exportertest.NewNopCreateSettings()
sr := new(tracetest.SpanRecorder)
set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

want := errors.New("export_error")
te, err := NewTracesRequestExporter(context.Background(), set, &fakeRequestConverter{requestError: want})
require.NoError(t, err)
require.NotNil(t, te)

checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, want, 1)
}

func TestTracesExporter_WithShutdown(t *testing.T) {
shutdownCalled := false
shutdown := func(context.Context) error { shutdownCalled = true; return nil }
Expand Down

0 comments on commit a29e672

Please sign in to comment.