From 6c4bfd115e816faebe0edadcceb852fa64888c69 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 3 Jun 2021 08:27:00 -0700 Subject: [PATCH] Convert otlptext to implement Marshaler interfaces Signed-off-by: Bogdan Drutu --- exporter/loggingexporter/logging_exporter.go | 89 ++++++++++--------- .../loggingexporter/logging_exporter_test.go | 38 ++++++++ internal/otlptext/databuffer.go | 7 +- internal/otlptext/logs.go | 18 +++- internal/otlptext/logs_test.go | 3 +- internal/otlptext/metrics.go | 18 +++- internal/otlptext/metrics_test.go | 3 +- internal/otlptext/traces.go | 18 +++- internal/otlptext/traces_test.go | 3 +- 9 files changed, 135 insertions(+), 62 deletions(-) diff --git a/exporter/loggingexporter/logging_exporter.go b/exporter/loggingexporter/logging_exporter.go index cb9a0ae17ab..9dda804fb8d 100644 --- a/exporter/loggingexporter/logging_exporter.go +++ b/exporter/loggingexporter/logging_exporter.go @@ -26,53 +26,77 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/internal/model" "go.opentelemetry.io/collector/internal/otlptext" ) type loggingExporter struct { - logger *zap.Logger - debug bool + logger *zap.Logger + debug bool + logsMarshaler model.LogsMarshaler + metricsMarshaler model.MetricsMarshaler + tracesMarshaler model.TracesMarshaler } -func (s *loggingExporter) pushTraces( - _ context.Context, - td pdata.Traces, -) error { - +func (s *loggingExporter) pushTraces(_ context.Context, td pdata.Traces) error { s.logger.Info("TracesExporter", zap.Int("#spans", td.SpanCount())) if !s.debug { return nil } - s.logger.Debug(otlptext.Traces(td)) - + buf, err := s.tracesMarshaler.Marshal(td) + if err != nil { + return err + } + s.logger.Debug(string(buf)) return nil } -func (s *loggingExporter) pushMetrics( - _ context.Context, - md pdata.Metrics, -) error { +func (s *loggingExporter) pushMetrics(_ context.Context, md pdata.Metrics) error { s.logger.Info("MetricsExporter", zap.Int("#metrics", md.MetricCount())) if !s.debug { return nil } - s.logger.Debug(otlptext.Metrics(md)) + buf, err := s.metricsMarshaler.Marshal(md) + if err != nil { + return err + } + s.logger.Debug(string(buf)) + return nil +} + +func (s *loggingExporter) pushLogs(_ context.Context, ld pdata.Logs) error { + s.logger.Info("LogsExporter", zap.Int("#logs", ld.LogRecordCount())) + + if !s.debug { + return nil + } + buf, err := s.logsMarshaler.Marshal(ld) + if err != nil { + return err + } + s.logger.Debug(string(buf)) return nil } +func newLoggingExporter(level string, logger *zap.Logger) *loggingExporter { + return &loggingExporter{ + debug: strings.ToLower(level) == "debug", + logger: logger, + logsMarshaler: otlptext.NewTextLogsMarshaler(), + metricsMarshaler: otlptext.NewTextMetricsMarshaler(), + tracesMarshaler: otlptext.NewTextTracesMarshaler(), + } +} + // newTracesExporter creates an exporter.TracesExporter that just drops the // received data and logs debugging messages. func newTracesExporter(config config.Exporter, level string, logger *zap.Logger) (component.TracesExporter, error) { - s := &loggingExporter{ - debug: strings.ToLower(level) == "debug", - logger: logger, - } - + s := newLoggingExporter(level, logger) return exporterhelper.NewTracesExporter( config, logger, @@ -89,11 +113,7 @@ func newTracesExporter(config config.Exporter, level string, logger *zap.Logger) // newMetricsExporter creates an exporter.MetricsExporter that just drops the // received data and logs debugging messages. func newMetricsExporter(config config.Exporter, level string, logger *zap.Logger) (component.MetricsExporter, error) { - s := &loggingExporter{ - debug: strings.ToLower(level) == "debug", - logger: logger, - } - + s := newLoggingExporter(level, logger) return exporterhelper.NewMetricsExporter( config, logger, @@ -110,11 +130,7 @@ func newMetricsExporter(config config.Exporter, level string, logger *zap.Logger // newLogsExporter creates an exporter.LogsExporter that just drops the // received data and logs debugging messages. func newLogsExporter(config config.Exporter, level string, logger *zap.Logger) (component.LogsExporter, error) { - s := &loggingExporter{ - debug: strings.ToLower(level) == "debug", - logger: logger, - } - + s := newLoggingExporter(level, logger) return exporterhelper.NewLogsExporter( config, logger, @@ -128,21 +144,6 @@ func newLogsExporter(config config.Exporter, level string, logger *zap.Logger) ( ) } -func (s *loggingExporter) pushLogs( - _ context.Context, - ld pdata.Logs, -) error { - s.logger.Info("LogsExporter", zap.Int("#logs", ld.LogRecordCount())) - - if !s.debug { - return nil - } - - s.logger.Debug(otlptext.Logs(ld)) - - return nil -} - func loggerSync(logger *zap.Logger) func(context.Context) error { return func(context.Context) error { // Currently Sync() return a different error depending on the OS. diff --git a/exporter/loggingexporter/logging_exporter_test.go b/exporter/loggingexporter/logging_exporter_test.go index 2b70a0e4d07..cde7da2afde 100644 --- a/exporter/loggingexporter/logging_exporter_test.go +++ b/exporter/loggingexporter/logging_exporter_test.go @@ -15,6 +15,7 @@ package loggingexporter import ( "context" + "errors" "testing" "github.com/stretchr/testify/assert" @@ -62,3 +63,40 @@ func TestLoggingLogsExporterNoErrors(t *testing.T) { assert.NoError(t, lle.Shutdown(context.Background())) } + +func TestLoggingExporterErrors(t *testing.T) { + le := newLoggingExporter("Debug", zap.NewNop()) + require.NotNil(t, le) + + errWant := errors.New("my error") + le.tracesMarshaler = &errTracesMarshaler{err: errWant} + le.metricsMarshaler = &errMetricsMarshaler{err: errWant} + le.logsMarshaler = &errLogsMarshaler{err: errWant} + assert.Equal(t, errWant, le.pushTraces(context.Background(), pdata.NewTraces())) + assert.Equal(t, errWant, le.pushMetrics(context.Background(), pdata.NewMetrics())) + assert.Equal(t, errWant, le.pushLogs(context.Background(), pdata.NewLogs())) +} + +type errLogsMarshaler struct { + err error +} + +func (e errLogsMarshaler) Marshal(pdata.Logs) ([]byte, error) { + return nil, e.err +} + +type errMetricsMarshaler struct { + err error +} + +func (e errMetricsMarshaler) Marshal(pdata.Metrics) ([]byte, error) { + return nil, e.err +} + +type errTracesMarshaler struct { + err error +} + +func (e errTracesMarshaler) Marshal(pdata.Traces) ([]byte, error) { + return nil, e.err +} diff --git a/internal/otlptext/databuffer.go b/internal/otlptext/databuffer.go index e780606aece..e0ded503b31 100644 --- a/internal/otlptext/databuffer.go +++ b/internal/otlptext/databuffer.go @@ -15,6 +15,7 @@ package otlptext import ( + "bytes" "fmt" "strconv" "strings" @@ -24,12 +25,12 @@ import ( ) type dataBuffer struct { - str strings.Builder + buf bytes.Buffer } func (b *dataBuffer) logEntry(format string, a ...interface{}) { - b.str.WriteString(fmt.Sprintf(format, a...)) - b.str.WriteString("\n") + b.buf.WriteString(fmt.Sprintf(format, a...)) + b.buf.WriteString("\n") } func (b *dataBuffer) logAttr(label string, value string) { diff --git a/internal/otlptext/logs.go b/internal/otlptext/logs.go index d2058e18da0..ae6b3a14e9a 100644 --- a/internal/otlptext/logs.go +++ b/internal/otlptext/logs.go @@ -14,10 +14,20 @@ package otlptext -import "go.opentelemetry.io/collector/consumer/pdata" +import ( + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal/model" +) -// Logs data to text -func Logs(ld pdata.Logs) string { +// NewTextLogsMarshaler returns a serializer.LogsMarshaler to encode to OTLP json bytes. +func NewTextLogsMarshaler() model.LogsMarshaler { + return logsMarshaler{} +} + +type logsMarshaler struct{} + +// Marshal data to text. +func (logsMarshaler) Marshal(ld pdata.Logs) ([]byte, error) { buf := dataBuffer{} rls := ld.ResourceLogs() for i := 0; i < rls.Len(); i++ { @@ -39,5 +49,5 @@ func Logs(ld pdata.Logs) string { } } - return buf.str.String() + return buf.buf.Bytes(), nil } diff --git a/internal/otlptext/logs_test.go b/internal/otlptext/logs_test.go index cafa2f1c88d..69c29fd630a 100644 --- a/internal/otlptext/logs_test.go +++ b/internal/otlptext/logs_test.go @@ -39,7 +39,8 @@ func TestLogs(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - logs := Logs(tt.args.ld) + logs, err := NewTextLogsMarshaler().Marshal(tt.args.ld) + assert.NoError(t, err) if !tt.empty { assert.NotEmpty(t, logs) } diff --git a/internal/otlptext/metrics.go b/internal/otlptext/metrics.go index c9d5cc1c15f..5b08fa29c46 100644 --- a/internal/otlptext/metrics.go +++ b/internal/otlptext/metrics.go @@ -14,10 +14,20 @@ package otlptext -import "go.opentelemetry.io/collector/consumer/pdata" +import ( + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal/model" +) -// Metrics data to text -func Metrics(md pdata.Metrics) string { +// NewTextMetricsMarshaler returns a serializer.MetricsMarshaler to encode to OTLP json bytes. +func NewTextMetricsMarshaler() model.MetricsMarshaler { + return metricsMarshaler{} +} + +type metricsMarshaler struct{} + +// Marshal data to text. +func (metricsMarshaler) Marshal(md pdata.Metrics) ([]byte, error) { buf := dataBuffer{} rms := md.ResourceMetrics() for i := 0; i < rms.Len(); i++ { @@ -39,5 +49,5 @@ func Metrics(md pdata.Metrics) string { } } - return buf.str.String() + return buf.buf.Bytes(), nil } diff --git a/internal/otlptext/metrics_test.go b/internal/otlptext/metrics_test.go index cb853c731e7..4032318956b 100644 --- a/internal/otlptext/metrics_test.go +++ b/internal/otlptext/metrics_test.go @@ -39,7 +39,8 @@ func TestMetrics(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - metrics := Metrics(tt.args.md) + metrics, err := NewTextMetricsMarshaler().Marshal(tt.args.md) + assert.NoError(t, err) if !tt.empty { assert.NotEmpty(t, metrics) } diff --git a/internal/otlptext/traces.go b/internal/otlptext/traces.go index 6718ccd169c..ce12785b2bd 100644 --- a/internal/otlptext/traces.go +++ b/internal/otlptext/traces.go @@ -14,10 +14,20 @@ package otlptext -import "go.opentelemetry.io/collector/consumer/pdata" +import ( + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal/model" +) -// Traces data to text -func Traces(td pdata.Traces) string { +// NewTextTracesMarshaler returns a serializer.TracesMarshaler to encode to OTLP json bytes. +func NewTextTracesMarshaler() model.TracesMarshaler { + return tracesMarshaler{} +} + +type tracesMarshaler struct{} + +// Marshal data to text. +func (tracesMarshaler) Marshal(td pdata.Traces) ([]byte, error) { buf := dataBuffer{} rss := td.ResourceSpans() for i := 0; i < rss.Len(); i++ { @@ -52,5 +62,5 @@ func Traces(td pdata.Traces) string { } } - return buf.str.String() + return buf.buf.Bytes(), nil } diff --git a/internal/otlptext/traces_test.go b/internal/otlptext/traces_test.go index 87b1b32009c..b25998b7093 100644 --- a/internal/otlptext/traces_test.go +++ b/internal/otlptext/traces_test.go @@ -37,7 +37,8 @@ func TestTraces(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - traces := Traces(tt.args.td) + traces, err := NewTextTracesMarshaler().Marshal(tt.args.td) + assert.NoError(t, err) if !tt.empty { assert.NotEmpty(t, traces) }