Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove to from proto in pdata, can use OTLP marshaler/unmarshaler #3515

Merged
merged 1 commit into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions model/otlp/pb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ package otlp

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/model/pdata"
)

func TestProtobufLogsUnmarshaler_error(t *testing.T) {
Expand All @@ -37,3 +41,135 @@ func TestProtobufTracesUnmarshaler_error(t *testing.T) {
_, err := p.UnmarshalTraces([]byte("+$%"))
assert.Error(t, err)
}

func BenchmarkLogsToProtobuf(b *testing.B) {
marshaler := NewProtobufLogsMarshaler()
logs := generateBenchmarkLogs(128)
b.ResetTimer()
for n := 0; n < b.N; n++ {
buf, err := marshaler.MarshalLogs(logs)
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
}
}

func BenchmarkLogsFromProtobuf(b *testing.B) {
marshaler := NewProtobufLogsMarshaler()
unmarshaler := NewProtobufLogsUnmarshaler()
baseLogs := generateBenchmarkLogs(128)
buf, err := marshaler.MarshalLogs(baseLogs)
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
logs, err := unmarshaler.UnmarshalLogs(buf)
require.NoError(b, err)
assert.Equal(b, baseLogs.ResourceLogs().Len(), logs.ResourceLogs().Len())
}
}

func BenchmarkMetricsToProtobuf(b *testing.B) {
marshaler := NewProtobufMetricsMarshaler()
metrics := generateBenchmarkMetrics(128)
b.ResetTimer()
for n := 0; n < b.N; n++ {
buf, err := marshaler.MarshalMetrics(metrics)
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
}
}

func BenchmarkMetricsFromProtobuf(b *testing.B) {
marshaler := NewProtobufMetricsMarshaler()
unmarshaler := NewProtobufMetricsUnmarshaler()
baseMetrics := generateBenchmarkMetrics(128)
buf, err := marshaler.MarshalMetrics(baseMetrics)
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
metrics, err := unmarshaler.UnmarshalMetrics(buf)
require.NoError(b, err)
assert.Equal(b, baseMetrics.ResourceMetrics().Len(), metrics.ResourceMetrics().Len())
}
}

func BenchmarkTracesToProtobuf(b *testing.B) {
marshaler := NewProtobufTracesMarshaler()
traces := generateBenchmarkTraces(128)
b.ResetTimer()
for n := 0; n < b.N; n++ {
buf, err := marshaler.MarshalTraces(traces)
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
}
}

func BenchmarkTracesFromProtobuf(b *testing.B) {
marshaler := NewProtobufTracesMarshaler()
unmarshaler := NewProtobufTracesUnmarshaler()
baseTraces := generateBenchmarkTraces(128)
buf, err := marshaler.MarshalTraces(baseTraces)
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
traces, err := unmarshaler.UnmarshalTraces(buf)
require.NoError(b, err)
assert.Equal(b, baseTraces.ResourceSpans().Len(), traces.ResourceSpans().Len())
}
}

func generateBenchmarkLogs(logsCount int) pdata.Logs {
endTime := pdata.TimestampFromTime(time.Now())

md := pdata.NewLogs()
ilm := md.ResourceLogs().AppendEmpty().InstrumentationLibraryLogs().AppendEmpty()
ilm.Logs().Resize(logsCount)
for i := 0; i < logsCount; i++ {
im := ilm.Logs().At(i)
im.SetName("test_name")
im.SetTimestamp(endTime)
}
return md
}

func generateBenchmarkMetrics(metricsCount int) pdata.Metrics {
now := time.Now()
startTime := pdata.TimestampFromTime(now.Add(-10 * time.Second))
endTime := pdata.TimestampFromTime(now)

md := pdata.NewMetrics()
ilm := md.ResourceMetrics().AppendEmpty().InstrumentationLibraryMetrics().AppendEmpty()
ilm.Metrics().Resize(metricsCount)
for i := 0; i < metricsCount; i++ {
im := ilm.Metrics().At(i)
im.SetName("test_name")
im.SetDataType(pdata.MetricDataTypeIntSum)
idp := im.IntSum().DataPoints().AppendEmpty()
idp.SetStartTimestamp(startTime)
idp.SetTimestamp(endTime)
idp.SetValue(123)
}
return md
}

func generateBenchmarkTraces(metricsCount int) pdata.Traces {
now := time.Now()
startTime := pdata.TimestampFromTime(now.Add(-10 * time.Second))
endTime := pdata.TimestampFromTime(now)

md := pdata.NewTraces()
ilm := md.ResourceSpans().AppendEmpty().InstrumentationLibrarySpans().AppendEmpty()
ilm.Spans().Resize(metricsCount)
for i := 0; i < metricsCount; i++ {
im := ilm.Spans().At(i)
im.SetName("test_name")
im.SetStartTimestamp(startTime)
im.SetEndTimestamp(endTime)
}
return md
}
20 changes: 0 additions & 20 deletions model/pdata/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,33 +56,13 @@ func LogsFromInternalRep(logs internal.LogsWrapper) Logs {
return Logs{orig: internal.LogsToOtlp(logs)}
}

// LogsFromOtlpProtoBytes converts OTLP Collector ExportLogsServiceRequest
// ProtoBuf bytes to the internal Logs.
//
// Returns an invalid Logs instance if error is not nil.
func LogsFromOtlpProtoBytes(data []byte) (Logs, error) {
req := otlpcollectorlog.ExportLogsServiceRequest{}
if err := req.Unmarshal(data); err != nil {
return Logs{}, err
}
return Logs{orig: &req}, nil
}

// InternalRep returns internal representation of the logs. Should not be used outside
// this module. This is intended to be used only by OTLP exporter and File exporter,
// which legitimately need to work with OTLP Protobuf structs.
func (ld Logs) InternalRep() internal.LogsWrapper {
return internal.LogsFromOtlp(ld.orig)
}

// ToOtlpProtoBytes converts this Logs to the OTLP Collector ExportLogsServiceRequest
// ProtoBuf bytes.
//
// Returns an nil byte-array if error is not nil.
func (ld Logs) ToOtlpProtoBytes() ([]byte, error) {
return ld.orig.Marshal()
}

// Clone returns a copy of Logs.
func (ld Logs) Clone() Logs {
cloneLd := NewLogs()
Expand Down
43 changes: 0 additions & 43 deletions model/pdata/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/internal"
otlpcollectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
Expand Down Expand Up @@ -78,22 +77,6 @@ func TestToFromLogProto(t *testing.T) {
assert.EqualValues(t, &otlpcollectorlog.ExportLogsServiceRequest{}, ld.orig)
}

func TestLogsToFromOtlpProtoBytes(t *testing.T) {
send := NewLogs()
fillTestResourceLogsSlice(send.ResourceLogs())
bytes, err := send.ToOtlpProtoBytes()
assert.NoError(t, err)

recv, err := LogsFromOtlpProtoBytes(bytes)
assert.NoError(t, err)
assert.EqualValues(t, send, recv)
}

func TestLogsFromInvalidOtlpProtoBytes(t *testing.T) {
_, err := LogsFromOtlpProtoBytes([]byte{0xFF})
assert.EqualError(t, err, "unexpected EOF")
}

func TestLogsClone(t *testing.T) {
logs := NewLogs()
fillTestResourceLogsSlice(logs.ResourceLogs())
Expand All @@ -111,29 +94,3 @@ func BenchmarkLogsClone(b *testing.B) {
}
}
}

func BenchmarkLogsToOtlp(b *testing.B) {
traces := NewLogs()
fillTestResourceLogsSlice(traces.ResourceLogs())
b.ResetTimer()
for n := 0; n < b.N; n++ {
buf, err := traces.ToOtlpProtoBytes()
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
}
}

func BenchmarkLogsFromOtlp(b *testing.B) {
baseLogs := NewLogs()
fillTestResourceLogsSlice(baseLogs.ResourceLogs())
buf, err := baseLogs.ToOtlpProtoBytes()
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
logs, err := LogsFromOtlpProtoBytes(buf)
require.NoError(b, err)
assert.Equal(b, baseLogs.ResourceLogs().Len(), logs.ResourceLogs().Len())
}
}
20 changes: 0 additions & 20 deletions model/pdata/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,32 +54,12 @@ func MetricsFromInternalRep(wrapper internal.MetricsWrapper) Metrics {
return Metrics{orig: internal.MetricsToOtlp(wrapper)}
}

// MetricsFromOtlpProtoBytes converts the OTLP Collector ExportMetricsServiceRequest
// ProtoBuf bytes to Metrics.
//
// Returns an invalid Metrics instance if error is not nil.
func MetricsFromOtlpProtoBytes(data []byte) (Metrics, error) {
req := otlpcollectormetrics.ExportMetricsServiceRequest{}
if err := req.Unmarshal(data); err != nil {
return Metrics{}, err
}
return Metrics{orig: &req}, nil
}

// InternalRep returns internal representation of the Metrics.
// Should not be used outside this module.
func (md Metrics) InternalRep() internal.MetricsWrapper {
return internal.MetricsFromOtlp(md.orig)
}

// ToOtlpProtoBytes converts this Metrics to the OTLP Collector ExportMetricsServiceRequest
// ProtoBuf bytes.
//
// Returns an nil byte-array if error is not nil.
func (md Metrics) ToOtlpProtoBytes() ([]byte, error) {
return md.orig.Marshal()
}

// Clone returns a copy of MetricData.
func (md Metrics) Clone() Metrics {
cloneMd := NewMetrics()
Expand Down
64 changes: 0 additions & 64 deletions model/pdata/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,22 +650,6 @@ func TestOtlpToFromInternalHistogramMutating(t *testing.T) {
}, internal.MetricsToOtlp(md.InternalRep()))
}

func TestMetricsToFromOtlpProtoBytes(t *testing.T) {
send := NewMetrics()
fillTestResourceMetricsSlice(send.ResourceMetrics())
bytes, err := send.ToOtlpProtoBytes()
assert.NoError(t, err)

recv, err := MetricsFromOtlpProtoBytes(bytes)
assert.NoError(t, err)
assert.EqualValues(t, send, recv)
}

func TestMetricsFromInvalidOtlpProtoBytes(t *testing.T) {
_, err := MetricsFromOtlpProtoBytes([]byte{0xFF})
assert.EqualError(t, err, "unexpected EOF")
}

func TestMetricsClone(t *testing.T) {
metrics := NewMetrics()
fillTestResourceMetricsSlice(metrics.ResourceMetrics())
Expand Down Expand Up @@ -787,54 +771,6 @@ func BenchmarkOtlpToFromInternal_HistogramPoints_MutateOneLabel(b *testing.B) {
}
}

func BenchmarkMetrics_ToOtlpProtoBytes_PassThrough(b *testing.B) {
req := &otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
Resource: generateTestProtoResource(),
InstrumentationLibraryMetrics: []*otlpmetrics.InstrumentationLibraryMetrics{
{
InstrumentationLibrary: generateTestProtoInstrumentationLibrary(),
Metrics: []*otlpmetrics.Metric{generateTestProtoIntGaugeMetric(), generateTestProtoDoubleSumMetric(), generateTestProtoDoubleHistogramMetric()},
},
},
},
},
}
md := MetricsFromInternalRep(internal.MetricsFromOtlp(req))

b.ResetTimer()
for n := 0; n < b.N; n++ {
_, _ = md.ToOtlpProtoBytes()
}
}

func BenchmarkMetricsToOtlp(b *testing.B) {
traces := NewMetrics()
fillTestResourceMetricsSlice(traces.ResourceMetrics())
b.ResetTimer()
for n := 0; n < b.N; n++ {
buf, err := traces.ToOtlpProtoBytes()
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
}
}

func BenchmarkMetricsFromOtlp(b *testing.B) {
baseMetrics := NewMetrics()
fillTestResourceMetricsSlice(baseMetrics.ResourceMetrics())
buf, err := baseMetrics.ToOtlpProtoBytes()
require.NoError(b, err)
assert.NotEqual(b, 0, len(buf))
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
md, err := MetricsFromOtlpProtoBytes(buf)
require.NoError(b, err)
assert.Equal(b, baseMetrics.ResourceMetrics().Len(), md.ResourceMetrics().Len())
}
}

func generateTestProtoResource() otlpresource.Resource {
return otlpresource.Resource{
Attributes: []otlpcommon.KeyValue{
Expand Down
21 changes: 0 additions & 21 deletions model/pdata/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,33 +50,12 @@ func TracesFromInternalRep(wrapper internal.TracesWrapper) Traces {
return Traces{orig: internal.TracesToOtlp(wrapper)}
}

// TracesFromOtlpProtoBytes converts OTLP Collector ExportTraceServiceRequest
// ProtoBuf bytes to the internal Traces.
//
// Returns an invalid Traces instance if error is not nil.
func TracesFromOtlpProtoBytes(data []byte) (Traces, error) {
req := otlpcollectortrace.ExportTraceServiceRequest{}
if err := req.Unmarshal(data); err != nil {
return Traces{}, err
}
internal.TracesCompatibilityChanges(&req)
return Traces{orig: &req}, nil
}

// InternalRep returns internal representation of the Traces.
// Should not be used outside this module.
func (td Traces) InternalRep() internal.TracesWrapper {
return internal.TracesFromOtlp(td.orig)
}

// ToOtlpProtoBytes converts this Traces to the OTLP Collector ExportTraceServiceRequest
// ProtoBuf bytes.
//
// Returns an nil byte-array if error is not nil.
func (td Traces) ToOtlpProtoBytes() ([]byte, error) {
return td.orig.Marshal()
}

// Clone returns a copy of Traces.
func (td Traces) Clone() Traces {
cloneTd := NewTraces()
Expand Down
Loading