From 8a610b37415226a522b76d45c3dd84c259494cbe Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Wed, 9 Jun 2021 03:04:53 -0700 Subject: [PATCH] Add encoder/decoder and marshaler/unmarshaler for OTLP protobuf Signed-off-by: Bogdan Drutu --- internal/otlp/marshaler.go | 21 ++++++++++++--- internal/otlp/pb_decoder.go | 45 +++++++++++++++++++++++++++++++ internal/otlp/pb_encoder.go | 52 ++++++++++++++++++++++++++++++++++++ internal/otlp/pb_test.go | 39 +++++++++++++++++++++++++++ internal/otlp/unmarshaler.go | 21 ++++++++++++--- 5 files changed, 172 insertions(+), 6 deletions(-) create mode 100644 internal/otlp/pb_decoder.go create mode 100644 internal/otlp/pb_encoder.go create mode 100644 internal/otlp/pb_test.go diff --git a/internal/otlp/marshaler.go b/internal/otlp/marshaler.go index 12da9a7a634..364b68a71c0 100644 --- a/internal/otlp/marshaler.go +++ b/internal/otlp/marshaler.go @@ -18,17 +18,32 @@ import ( "go.opentelemetry.io/collector/internal/model" ) -// NewJSONTracesMarshaler returns a model.TracesMarshaler to decode from OTLP json bytes. +// NewJSONTracesMarshaler returns a model.TracesMarshaler. Marshals to OTLP json bytes. func NewJSONTracesMarshaler() model.TracesMarshaler { return model.NewTracesMarshaler(newJSONEncoder(), newFromTranslator()) } -// NewJSONMetricsMarshaler returns a model.MetricsMarshaler to decode from OTLP json bytes. +// NewJSONMetricsMarshaler returns a model.MetricsMarshaler. Marshals to OTLP json bytes. func NewJSONMetricsMarshaler() model.MetricsMarshaler { return model.NewMetricsMarshaler(newJSONEncoder(), newFromTranslator()) } -// NewJSONLogsMarshaler returns a model.LogsMarshaler to decode from OTLP json bytes. +// NewJSONLogsMarshaler returns a model.LogsMarshaler. Marshals to OTLP json bytes. func NewJSONLogsMarshaler() model.LogsMarshaler { return model.NewLogsMarshaler(newJSONEncoder(), newFromTranslator()) } + +// NewProtobufTracesMarshaler returns a model.TracesMarshaler. Marshals to OTLP binary protobuf bytes. +func NewProtobufTracesMarshaler() model.TracesMarshaler { + return model.NewTracesMarshaler(newPbEncoder(), newFromTranslator()) +} + +// NewProtobufMetricsMarshaler returns a model.MetricsMarshaler. Marshals to OTLP binary protobuf bytes. +func NewProtobufMetricsMarshaler() model.MetricsMarshaler { + return model.NewMetricsMarshaler(newPbEncoder(), newFromTranslator()) +} + +// NewProtobufLogsMarshaler returns a model.LogsMarshaler. Marshals to OTLP binary protobuf bytes. +func NewProtobufLogsMarshaler() model.LogsMarshaler { + return model.NewLogsMarshaler(newPbEncoder(), newFromTranslator()) +} diff --git a/internal/otlp/pb_decoder.go b/internal/otlp/pb_decoder.go new file mode 100644 index 00000000000..07a8cc9bdf2 --- /dev/null +++ b/internal/otlp/pb_decoder.go @@ -0,0 +1,45 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp + +import ( + otlpcollectorlogs "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1" + otlpcollectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1" + otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" +) + +type pbDecoder struct{} + +func newPbDecoder() *pbDecoder { + return &pbDecoder{} +} + +func (d *pbDecoder) DecodeLogs(buf []byte) (interface{}, error) { + ld := &otlpcollectorlogs.ExportLogsServiceRequest{} + err := ld.Unmarshal(buf) + return ld, err +} + +func (d *pbDecoder) DecodeMetrics(buf []byte) (interface{}, error) { + md := &otlpcollectormetrics.ExportMetricsServiceRequest{} + err := md.Unmarshal(buf) + return md, err +} + +func (d *pbDecoder) DecodeTraces(buf []byte) (interface{}, error) { + td := &otlpcollectortrace.ExportTraceServiceRequest{} + err := td.Unmarshal(buf) + return td, err +} diff --git a/internal/otlp/pb_encoder.go b/internal/otlp/pb_encoder.go new file mode 100644 index 00000000000..401c277b95c --- /dev/null +++ b/internal/otlp/pb_encoder.go @@ -0,0 +1,52 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp + +import ( + otlpcollectorlogs "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1" + otlpcollectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1" + otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" + "go.opentelemetry.io/collector/internal/model" +) + +type pbEncoder struct{} + +func newPbEncoder() *pbEncoder { + return &pbEncoder{} +} + +func (e *pbEncoder) EncodeLogs(modelData interface{}) ([]byte, error) { + ld, ok := modelData.(*otlpcollectorlogs.ExportLogsServiceRequest) + if !ok { + return nil, model.NewErrIncompatibleType(&otlpcollectorlogs.ExportLogsServiceRequest{}, modelData) + } + return ld.Marshal() +} + +func (e *pbEncoder) EncodeMetrics(modelData interface{}) ([]byte, error) { + md, ok := modelData.(*otlpcollectormetrics.ExportMetricsServiceRequest) + if !ok { + return nil, model.NewErrIncompatibleType(&otlpcollectormetrics.ExportMetricsServiceRequest{}, modelData) + } + return md.Marshal() +} + +func (e *pbEncoder) EncodeTraces(modelData interface{}) ([]byte, error) { + td, ok := modelData.(*otlpcollectortrace.ExportTraceServiceRequest) + if !ok { + return nil, model.NewErrIncompatibleType(&otlpcollectortrace.ExportTraceServiceRequest{}, modelData) + } + return td.Marshal() +} diff --git a/internal/otlp/pb_test.go b/internal/otlp/pb_test.go new file mode 100644 index 00000000000..57c5d07778c --- /dev/null +++ b/internal/otlp/pb_test.go @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestProtobufLogsUnmarshaler_error(t *testing.T) { + p := NewProtobufLogsUnmarshaler() + _, err := p.Unmarshal([]byte("+$%")) + assert.Error(t, err) +} + +func TestProtobufMetricsUnmarshaler_error(t *testing.T) { + p := NewProtobufMetricsUnmarshaler() + _, err := p.Unmarshal([]byte("+$%")) + assert.Error(t, err) +} + +func TestProtobufTracesUnmarshaler_error(t *testing.T) { + p := NewProtobufTracesUnmarshaler() + _, err := p.Unmarshal([]byte("+$%")) + assert.Error(t, err) +} diff --git a/internal/otlp/unmarshaler.go b/internal/otlp/unmarshaler.go index 5cb8a48ca1d..f7f7fb61f9a 100644 --- a/internal/otlp/unmarshaler.go +++ b/internal/otlp/unmarshaler.go @@ -18,17 +18,32 @@ import ( "go.opentelemetry.io/collector/internal/model" ) -// NewJSONTracesUnmarshaler returns a model.TracesUnmarshaler to decode from OTLP json bytes. +// NewJSONTracesUnmarshaler returns a model.TracesUnmarshaler. Unmarshals from OTLP json bytes. func NewJSONTracesUnmarshaler() model.TracesUnmarshaler { return model.NewTracesUnmarshaler(newJSONDecoder(), newToTranslator()) } -// NewJSONMetricsUnmarshaler returns a model.MetricsUnmarshaler to decode from OTLP json bytes. +// NewJSONMetricsUnmarshaler returns a model.MetricsUnmarshaler. Unmarshals from OTLP json bytes. func NewJSONMetricsUnmarshaler() model.MetricsUnmarshaler { return model.NewMetricsUnmarshaler(newJSONDecoder(), newToTranslator()) } -// NewJSONLogsUnmarshaler returns a model.LogsUnmarshaler to decode from OTLP json bytes. +// NewJSONLogsUnmarshaler returns a model.LogsUnmarshaler. Unmarshals from OTLP json bytes. func NewJSONLogsUnmarshaler() model.LogsUnmarshaler { return model.NewLogsUnmarshaler(newJSONDecoder(), newToTranslator()) } + +// NewProtobufTracesUnmarshaler returns a model.TracesUnmarshaler. Unmarshals from OTLP binary protobuf bytes. +func NewProtobufTracesUnmarshaler() model.TracesUnmarshaler { + return model.NewTracesUnmarshaler(newPbDecoder(), newToTranslator()) +} + +// NewProtobufMetricsUnmarshaler returns a model.MetricsUnmarshaler. Unmarshals from OTLP binary protobuf bytes. +func NewProtobufMetricsUnmarshaler() model.MetricsUnmarshaler { + return model.NewMetricsUnmarshaler(newPbDecoder(), newToTranslator()) +} + +// NewProtobufLogsUnmarshaler returns a model.LogsUnmarshaler. Unmarshals from OTLP binary protobuf bytes. +func NewProtobufLogsUnmarshaler() model.LogsUnmarshaler { + return model.NewLogsUnmarshaler(newPbDecoder(), newToTranslator()) +}