From 8f7e497b70f90f4d20975cb3945618fb5c716d04 Mon Sep 17 00:00:00 2001 From: David Yeghshatyan <6davidy@gmail.com> Date: Thu, 26 Jul 2018 14:21:21 -0400 Subject: [PATCH] Add ingester processor to unmarshal and write spans (#944) Unmarshal consumed messages into spans and write them --- cmd/ingester/app/processor/span_processor.go | 37 +++++++++- .../app/processor/span_processor_test.go | 74 +++++++++++++++++++ pkg/kafka/mocks/Unmarshaller.go | 48 ++++++++++++ ...marshaller_test.go => marshalling_test.go} | 18 +++-- plugin/storage/kafka/unmarshaller.go | 59 +++++++++++++++ 5 files changed, 227 insertions(+), 9 deletions(-) create mode 100644 cmd/ingester/app/processor/span_processor_test.go create mode 100644 pkg/kafka/mocks/Unmarshaller.go rename plugin/storage/kafka/{marshaller_test.go => marshalling_test.go} (62%) create mode 100644 plugin/storage/kafka/unmarshaller.go diff --git a/cmd/ingester/app/processor/span_processor.go b/cmd/ingester/app/processor/span_processor.go index 3b29bd8c5a6..a0a52c1b2cb 100644 --- a/cmd/ingester/app/processor/span_processor.go +++ b/cmd/ingester/app/processor/span_processor.go @@ -16,9 +16,14 @@ package processor import ( "io" + + "github.com/pkg/errors" + + "github.com/jaegertracing/jaeger/plugin/storage/kafka" + "github.com/jaegertracing/jaeger/storage/spanstore" ) -//go:generate mockery -name=SpanProcessor +//go:generate mockery -name=KafkaSpanProcessor // SpanProcessor processes kafka spans type SpanProcessor interface { @@ -30,3 +35,33 @@ type SpanProcessor interface { type Message interface { Value() []byte } + +// SpanProcessorParams stores the necessary parameters for a SpanProcessor +type SpanProcessorParams struct { + Writer spanstore.Writer + Unmarshaller kafka.Unmarshaller +} + +// KafkaSpanProcessor implements SpanProcessor for Kafka messages +type KafkaSpanProcessor struct { + unmarshaller kafka.Unmarshaller + writer spanstore.Writer + io.Closer +} + +// NewSpanProcessor creates a new KafkaSpanProcessor +func NewSpanProcessor(params SpanProcessorParams) *KafkaSpanProcessor { + return &KafkaSpanProcessor{ + unmarshaller: params.Unmarshaller, + writer: params.Writer, + } +} + +// Process unmarshals and writes a single kafka message +func (s KafkaSpanProcessor) Process(message Message) error { + mSpan, err := s.unmarshaller.Unmarshal(message.Value()) + if err != nil { + return errors.Wrap(err, "cannot unmarshall byte array into span") + } + return s.writer.WriteSpan(mSpan) +} diff --git a/cmd/ingester/app/processor/span_processor_test.go b/cmd/ingester/app/processor/span_processor_test.go new file mode 100644 index 00000000000..100b522ebef --- /dev/null +++ b/cmd/ingester/app/processor/span_processor_test.go @@ -0,0 +1,74 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + + cmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks" + "github.com/jaegertracing/jaeger/model" + umocks "github.com/jaegertracing/jaeger/pkg/kafka/mocks" + smocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" +) + +func TestNewSpanProcessor(t *testing.T) { + p := SpanProcessorParams{} + assert.NotNil(t, NewSpanProcessor(p)) +} + +func TestSpanProcessor_Process(t *testing.T) { + writer := &smocks.Writer{} + unmarshallerMock := &umocks.Unmarshaller{} + processor := &KafkaSpanProcessor{ + unmarshaller: unmarshallerMock, + writer: writer, + } + + message := &cmocks.Message{} + data := []byte("police") + span := &model.Span{} + + message.On("Value").Return(data) + unmarshallerMock.On("Unmarshal", data).Return(span, nil) + writer.On("WriteSpan", span).Return(nil) + + assert.Nil(t, processor.Process(message)) + + message.AssertExpectations(t) + writer.AssertExpectations(t) +} + +func TestSpanProcessor_ProcessError(t *testing.T) { + writer := &smocks.Writer{} + unmarshallerMock := &umocks.Unmarshaller{} + processor := &KafkaSpanProcessor{ + unmarshaller: unmarshallerMock, + writer: writer, + } + + message := &cmocks.Message{} + data := []byte("police") + + message.On("Value").Return(data) + unmarshallerMock.On("Unmarshal", data).Return(nil, errors.New("moocow")) + + assert.Error(t, processor.Process(message)) + + message.AssertExpectations(t) + writer.AssertNotCalled(t, "WriteSpan") +} diff --git a/pkg/kafka/mocks/Unmarshaller.go b/pkg/kafka/mocks/Unmarshaller.go new file mode 100644 index 00000000000..e879bed9399 --- /dev/null +++ b/pkg/kafka/mocks/Unmarshaller.go @@ -0,0 +1,48 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mocks + +import mock "github.com/stretchr/testify/mock" +import model "github.com/jaegertracing/jaeger/model" + +// Unmarshaller is an autogenerated mock type for the Unmarshaller type +type Unmarshaller struct { + mock.Mock +} + +// Unmarshal provides a mock function with given fields: _a0 +func (_m *Unmarshaller) Unmarshal(_a0 []byte) (*model.Span, error) { + ret := _m.Called(_a0) + + var r0 *model.Span + if rf, ok := ret.Get(0).(func([]byte) *model.Span); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*model.Span) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func([]byte) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/plugin/storage/kafka/marshaller_test.go b/plugin/storage/kafka/marshalling_test.go similarity index 62% rename from plugin/storage/kafka/marshaller_test.go rename to plugin/storage/kafka/marshalling_test.go index 785551ea641..3ec556ed32f 100644 --- a/plugin/storage/kafka/marshaller_test.go +++ b/plugin/storage/kafka/marshalling_test.go @@ -20,20 +20,22 @@ import ( "github.com/stretchr/testify/assert" ) -func TestProtoMarshaller(t *testing.T) { - marshaller := newProtobufMarshaller() +func TestProtobufMarshallerAndUnmarshaller(t *testing.T) { + testMarshallerAndUnmarshaller(t, newProtobufMarshaller(), NewProtobufUnmarshaller()) +} + +func TestJSONMarshallerAndUnmarshaller(t *testing.T) { + testMarshallerAndUnmarshaller(t, newJSONMarshaller(), NewJSONUnmarshaller()) +} +func testMarshallerAndUnmarshaller(t *testing.T, marshaller Marshaller, unmarshaller Unmarshaller) { bytes, err := marshaller.Marshal(sampleSpan) assert.NoError(t, err) assert.NotNil(t, bytes) -} -func TestJSONMarshaller(t *testing.T) { - marshaller := newJSONMarshaller() - - bytes, err := marshaller.Marshal(sampleSpan) + resultSpan, err := unmarshaller.Unmarshal(bytes) assert.NoError(t, err) - assert.NotNil(t, bytes) + assert.Equal(t, sampleSpan, resultSpan) } diff --git a/plugin/storage/kafka/unmarshaller.go b/plugin/storage/kafka/unmarshaller.go new file mode 100644 index 00000000000..d19b7181914 --- /dev/null +++ b/plugin/storage/kafka/unmarshaller.go @@ -0,0 +1,59 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "bytes" + + "github.com/gogo/protobuf/jsonpb" + "github.com/gogo/protobuf/proto" + + "github.com/jaegertracing/jaeger/model" +) + +// Unmarshaller decodes a byte array to a span +type Unmarshaller interface { + Unmarshal([]byte) (*model.Span, error) +} + +// ProtobufUnmarshaller implements Unmarshaller +type ProtobufUnmarshaller struct{} + +// NewProtobufUnmarshaller constructs a ProtobufUnmarshaller +func NewProtobufUnmarshaller() *ProtobufUnmarshaller { + return &ProtobufUnmarshaller{} +} + +// Unmarshal decodes a protobuf byte array to a span +func (h *ProtobufUnmarshaller) Unmarshal(msg []byte) (*model.Span, error) { + newSpan := &model.Span{} + err := proto.Unmarshal(msg, newSpan) + return newSpan, err +} + +// JSONUnmarshaller implements Unmarshaller +type JSONUnmarshaller struct{} + +// NewJSONUnmarshaller constructs a JSONUnmarshaller +func NewJSONUnmarshaller() *JSONUnmarshaller { + return &JSONUnmarshaller{} +} + +// Unmarshal decodes a json byte array to a span +func (h *JSONUnmarshaller) Unmarshal(msg []byte) (*model.Span, error) { + newSpan := &model.Span{} + err := jsonpb.Unmarshal(bytes.NewReader(msg), newSpan) + return newSpan, err +}