Skip to content

Commit

Permalink
Remove to from proto in pdata, can use OTLP marshaler/unmarshaler (#3515
Browse files Browse the repository at this point in the history
)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Jun 29, 2021
1 parent 3acff6c commit 061d61d
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 210 deletions.
136 changes: 136 additions & 0 deletions model/otlp/pb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ package otlp

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/model/pdata"
)

func TestProtobufLogsUnmarshaler_error(t *testing.T) {
Expand All @@ -37,3 +41,135 @@ func TestProtobufTracesUnmarshaler_error(t *testing.T) {
_, err := p.UnmarshalTraces([]byte("+$%"))
assert.Error(t, err)
}

func BenchmarkLogsToProtobuf(b *testing.B) {
marshaler := NewProtobufLogsMarshaler()
logs := generateBenchmarkLogs(128)
b.ResetTimer()
for n := 0; n < b.N; n++ {
buf, err := marshaler.MarshalLogs(logs)
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
}
}

func BenchmarkLogsFromProtobuf(b *testing.B) {
marshaler := NewProtobufLogsMarshaler()
unmarshaler := NewProtobufLogsUnmarshaler()
baseLogs := generateBenchmarkLogs(128)
buf, err := marshaler.MarshalLogs(baseLogs)
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
logs, err := unmarshaler.UnmarshalLogs(buf)
require.NoError(b, err)
assert.Equal(b, baseLogs.ResourceLogs().Len(), logs.ResourceLogs().Len())
}
}

func BenchmarkMetricsToProtobuf(b *testing.B) {
marshaler := NewProtobufMetricsMarshaler()
metrics := generateBenchmarkMetrics(128)
b.ResetTimer()
for n := 0; n < b.N; n++ {
buf, err := marshaler.MarshalMetrics(metrics)
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
}
}

func BenchmarkMetricsFromProtobuf(b *testing.B) {
marshaler := NewProtobufMetricsMarshaler()
unmarshaler := NewProtobufMetricsUnmarshaler()
baseMetrics := generateBenchmarkMetrics(128)
buf, err := marshaler.MarshalMetrics(baseMetrics)
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
metrics, err := unmarshaler.UnmarshalMetrics(buf)
require.NoError(b, err)
assert.Equal(b, baseMetrics.ResourceMetrics().Len(), metrics.ResourceMetrics().Len())
}
}

func BenchmarkTracesToProtobuf(b *testing.B) {
marshaler := NewProtobufTracesMarshaler()
traces := generateBenchmarkTraces(128)
b.ResetTimer()
for n := 0; n < b.N; n++ {
buf, err := marshaler.MarshalTraces(traces)
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
}
}

func BenchmarkTracesFromProtobuf(b *testing.B) {
marshaler := NewProtobufTracesMarshaler()
unmarshaler := NewProtobufTracesUnmarshaler()
baseTraces := generateBenchmarkTraces(128)
buf, err := marshaler.MarshalTraces(baseTraces)
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
traces, err := unmarshaler.UnmarshalTraces(buf)
require.NoError(b, err)
assert.Equal(b, baseTraces.ResourceSpans().Len(), traces.ResourceSpans().Len())
}
}

func generateBenchmarkLogs(logsCount int) pdata.Logs {
endTime := pdata.TimestampFromTime(time.Now())

md := pdata.NewLogs()
ilm := md.ResourceLogs().AppendEmpty().InstrumentationLibraryLogs().AppendEmpty()
ilm.Logs().Resize(logsCount)
for i := 0; i < logsCount; i++ {
im := ilm.Logs().At(i)
im.SetName("test_name")
im.SetTimestamp(endTime)
}
return md
}

func generateBenchmarkMetrics(metricsCount int) pdata.Metrics {
now := time.Now()
startTime := pdata.TimestampFromTime(now.Add(-10 * time.Second))
endTime := pdata.TimestampFromTime(now)

md := pdata.NewMetrics()
ilm := md.ResourceMetrics().AppendEmpty().InstrumentationLibraryMetrics().AppendEmpty()
ilm.Metrics().Resize(metricsCount)
for i := 0; i < metricsCount; i++ {
im := ilm.Metrics().At(i)
im.SetName("test_name")
im.SetDataType(pdata.MetricDataTypeIntSum)
idp := im.IntSum().DataPoints().AppendEmpty()
idp.SetStartTimestamp(startTime)
idp.SetTimestamp(endTime)
idp.SetValue(123)
}
return md
}

func generateBenchmarkTraces(metricsCount int) pdata.Traces {
now := time.Now()
startTime := pdata.TimestampFromTime(now.Add(-10 * time.Second))
endTime := pdata.TimestampFromTime(now)

md := pdata.NewTraces()
ilm := md.ResourceSpans().AppendEmpty().InstrumentationLibrarySpans().AppendEmpty()
ilm.Spans().Resize(metricsCount)
for i := 0; i < metricsCount; i++ {
im := ilm.Spans().At(i)
im.SetName("test_name")
im.SetStartTimestamp(startTime)
im.SetEndTimestamp(endTime)
}
return md
}
20 changes: 0 additions & 20 deletions model/pdata/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,33 +56,13 @@ func LogsFromInternalRep(logs internal.LogsWrapper) Logs {
return Logs{orig: internal.LogsToOtlp(logs)}
}

// LogsFromOtlpProtoBytes converts OTLP Collector ExportLogsServiceRequest
// ProtoBuf bytes to the internal Logs.
//
// Returns an invalid Logs instance if error is not nil.
func LogsFromOtlpProtoBytes(data []byte) (Logs, error) {
req := otlpcollectorlog.ExportLogsServiceRequest{}
if err := req.Unmarshal(data); err != nil {
return Logs{}, err
}
return Logs{orig: &req}, nil
}

// InternalRep returns internal representation of the logs. Should not be used outside
// this module. This is intended to be used only by OTLP exporter and File exporter,
// which legitimately need to work with OTLP Protobuf structs.
func (ld Logs) InternalRep() internal.LogsWrapper {
return internal.LogsFromOtlp(ld.orig)
}

// ToOtlpProtoBytes converts this Logs to the OTLP Collector ExportLogsServiceRequest
// ProtoBuf bytes.
//
// Returns an nil byte-array if error is not nil.
func (ld Logs) ToOtlpProtoBytes() ([]byte, error) {
return ld.orig.Marshal()
}

// Clone returns a copy of Logs.
func (ld Logs) Clone() Logs {
cloneLd := NewLogs()
Expand Down
43 changes: 0 additions & 43 deletions model/pdata/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

otlpcollectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
otlplogs "go.opentelemetry.io/collector/internal/data/protogen/logs/v1"
Expand Down Expand Up @@ -78,22 +77,6 @@ func TestToFromLogProto(t *testing.T) {
assert.EqualValues(t, &otlpcollectorlog.ExportLogsServiceRequest{}, ld.orig)
}

func TestLogsToFromOtlpProtoBytes(t *testing.T) {
send := NewLogs()
fillTestResourceLogsSlice(send.ResourceLogs())
bytes, err := send.ToOtlpProtoBytes()
assert.NoError(t, err)

recv, err := LogsFromOtlpProtoBytes(bytes)
assert.NoError(t, err)
assert.EqualValues(t, send, recv)
}

func TestLogsFromInvalidOtlpProtoBytes(t *testing.T) {
_, err := LogsFromOtlpProtoBytes([]byte{0xFF})
assert.EqualError(t, err, "unexpected EOF")
}

func TestLogsClone(t *testing.T) {
logs := NewLogs()
fillTestResourceLogsSlice(logs.ResourceLogs())
Expand All @@ -111,29 +94,3 @@ func BenchmarkLogsClone(b *testing.B) {
}
}
}

func BenchmarkLogsToOtlp(b *testing.B) {
traces := NewLogs()
fillTestResourceLogsSlice(traces.ResourceLogs())
b.ResetTimer()
for n := 0; n < b.N; n++ {
buf, err := traces.ToOtlpProtoBytes()
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
}
}

func BenchmarkLogsFromOtlp(b *testing.B) {
baseLogs := NewLogs()
fillTestResourceLogsSlice(baseLogs.ResourceLogs())
buf, err := baseLogs.ToOtlpProtoBytes()
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
logs, err := LogsFromOtlpProtoBytes(buf)
require.NoError(b, err)
assert.Equal(b, baseLogs.ResourceLogs().Len(), logs.ResourceLogs().Len())
}
}
20 changes: 0 additions & 20 deletions model/pdata/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,32 +54,12 @@ func MetricsFromInternalRep(wrapper internal.MetricsWrapper) Metrics {
return Metrics{orig: internal.MetricsToOtlp(wrapper)}
}

// MetricsFromOtlpProtoBytes converts the OTLP Collector ExportMetricsServiceRequest
// ProtoBuf bytes to Metrics.
//
// Returns an invalid Metrics instance if error is not nil.
func MetricsFromOtlpProtoBytes(data []byte) (Metrics, error) {
req := otlpcollectormetrics.ExportMetricsServiceRequest{}
if err := req.Unmarshal(data); err != nil {
return Metrics{}, err
}
return Metrics{orig: &req}, nil
}

// InternalRep returns internal representation of the Metrics.
// Should not be used outside this module.
func (md Metrics) InternalRep() internal.MetricsWrapper {
return internal.MetricsFromOtlp(md.orig)
}

// ToOtlpProtoBytes converts this Metrics to the OTLP Collector ExportMetricsServiceRequest
// ProtoBuf bytes.
//
// Returns an nil byte-array if error is not nil.
func (md Metrics) ToOtlpProtoBytes() ([]byte, error) {
return md.orig.Marshal()
}

// Clone returns a copy of MetricData.
func (md Metrics) Clone() Metrics {
cloneMd := NewMetrics()
Expand Down
64 changes: 0 additions & 64 deletions model/pdata/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,22 +650,6 @@ func TestOtlpToFromInternalHistogramMutating(t *testing.T) {
}, internal.MetricsToOtlp(md.InternalRep()))
}

func TestMetricsToFromOtlpProtoBytes(t *testing.T) {
send := NewMetrics()
fillTestResourceMetricsSlice(send.ResourceMetrics())
bytes, err := send.ToOtlpProtoBytes()
assert.NoError(t, err)

recv, err := MetricsFromOtlpProtoBytes(bytes)
assert.NoError(t, err)
assert.EqualValues(t, send, recv)
}

func TestMetricsFromInvalidOtlpProtoBytes(t *testing.T) {
_, err := MetricsFromOtlpProtoBytes([]byte{0xFF})
assert.EqualError(t, err, "unexpected EOF")
}

func TestMetricsClone(t *testing.T) {
metrics := NewMetrics()
fillTestResourceMetricsSlice(metrics.ResourceMetrics())
Expand Down Expand Up @@ -787,54 +771,6 @@ func BenchmarkOtlpToFromInternal_HistogramPoints_MutateOneLabel(b *testing.B) {
}
}

func BenchmarkMetrics_ToOtlpProtoBytes_PassThrough(b *testing.B) {
req := &otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
Resource: generateTestProtoResource(),
InstrumentationLibraryMetrics: []*otlpmetrics.InstrumentationLibraryMetrics{
{
InstrumentationLibrary: generateTestProtoInstrumentationLibrary(),
Metrics: []*otlpmetrics.Metric{generateTestProtoIntGaugeMetric(), generateTestProtoDoubleSumMetric(), generateTestProtoDoubleHistogramMetric()},
},
},
},
},
}
md := MetricsFromInternalRep(internal.MetricsFromOtlp(req))

b.ResetTimer()
for n := 0; n < b.N; n++ {
_, _ = md.ToOtlpProtoBytes()
}
}

func BenchmarkMetricsToOtlp(b *testing.B) {
traces := NewMetrics()
fillTestResourceMetricsSlice(traces.ResourceMetrics())
b.ResetTimer()
for n := 0; n < b.N; n++ {
buf, err := traces.ToOtlpProtoBytes()
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
}
}

func BenchmarkMetricsFromOtlp(b *testing.B) {
baseMetrics := NewMetrics()
fillTestResourceMetricsSlice(baseMetrics.ResourceMetrics())
buf, err := baseMetrics.ToOtlpProtoBytes()
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
md, err := MetricsFromOtlpProtoBytes(buf)
require.NoError(b, err)
assert.Equal(b, baseMetrics.ResourceMetrics().Len(), md.ResourceMetrics().Len())
}
}

func generateTestProtoResource() otlpresource.Resource {
return otlpresource.Resource{
Attributes: []otlpcommon.KeyValue{
Expand Down
21 changes: 0 additions & 21 deletions model/pdata/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,33 +50,12 @@ func TracesFromInternalRep(wrapper internal.TracesWrapper) Traces {
return Traces{orig: internal.TracesToOtlp(wrapper)}
}

// TracesFromOtlpProtoBytes converts OTLP Collector ExportTraceServiceRequest
// ProtoBuf bytes to the internal Traces.
//
// Returns an invalid Traces instance if error is not nil.
func TracesFromOtlpProtoBytes(data []byte) (Traces, error) {
req := otlpcollectortrace.ExportTraceServiceRequest{}
if err := req.Unmarshal(data); err != nil {
return Traces{}, err
}
internal.TracesCompatibilityChanges(&req)
return Traces{orig: &req}, nil
}

// InternalRep returns internal representation of the Traces.
// Should not be used outside this module.
func (td Traces) InternalRep() internal.TracesWrapper {
return internal.TracesFromOtlp(td.orig)
}

// ToOtlpProtoBytes converts this Traces to the OTLP Collector ExportTraceServiceRequest
// ProtoBuf bytes.
//
// Returns an nil byte-array if error is not nil.
func (td Traces) ToOtlpProtoBytes() ([]byte, error) {
return td.orig.Marshal()
}

// Clone returns a copy of Traces.
func (td Traces) Clone() Traces {
cloneTd := NewTraces()
Expand Down
Loading

0 comments on commit 061d61d

Please sign in to comment.