Skip to content

Commit

Permalink
Add ingester processor to unmarshal and write spans (#944)
Browse files Browse the repository at this point in the history
Unmarshal consumed messages into spans and write them
  • Loading branch information
davit-y authored and vprithvi committed Jul 26, 2018
1 parent cac7343 commit 8f7e497
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 9 deletions.
37 changes: 36 additions & 1 deletion cmd/ingester/app/processor/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ package processor

import (
"io"

"github.com/pkg/errors"

"github.com/jaegertracing/jaeger/plugin/storage/kafka"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

//go:generate mockery -name=SpanProcessor
//go:generate mockery -name=KafkaSpanProcessor

// SpanProcessor processes kafka spans
type SpanProcessor interface {
Expand All @@ -30,3 +35,33 @@ type SpanProcessor interface {
type Message interface {
Value() []byte
}

// SpanProcessorParams stores the necessary parameters for a SpanProcessor
type SpanProcessorParams struct {
Writer spanstore.Writer
Unmarshaller kafka.Unmarshaller
}

// KafkaSpanProcessor implements SpanProcessor for Kafka messages
type KafkaSpanProcessor struct {
unmarshaller kafka.Unmarshaller
writer spanstore.Writer
io.Closer
}

// NewSpanProcessor creates a new KafkaSpanProcessor
func NewSpanProcessor(params SpanProcessorParams) *KafkaSpanProcessor {
return &KafkaSpanProcessor{
unmarshaller: params.Unmarshaller,
writer: params.Writer,
}
}

// Process unmarshals and writes a single kafka message
func (s KafkaSpanProcessor) Process(message Message) error {
mSpan, err := s.unmarshaller.Unmarshal(message.Value())
if err != nil {
return errors.Wrap(err, "cannot unmarshall byte array into span")
}
return s.writer.WriteSpan(mSpan)
}
74 changes: 74 additions & 0 deletions cmd/ingester/app/processor/span_processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processor

import (
"testing"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"

cmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks"
"github.com/jaegertracing/jaeger/model"
umocks "github.com/jaegertracing/jaeger/pkg/kafka/mocks"
smocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
)

func TestNewSpanProcessor(t *testing.T) {
p := SpanProcessorParams{}
assert.NotNil(t, NewSpanProcessor(p))
}

func TestSpanProcessor_Process(t *testing.T) {
writer := &smocks.Writer{}
unmarshallerMock := &umocks.Unmarshaller{}
processor := &KafkaSpanProcessor{
unmarshaller: unmarshallerMock,
writer: writer,
}

message := &cmocks.Message{}
data := []byte("police")
span := &model.Span{}

message.On("Value").Return(data)
unmarshallerMock.On("Unmarshal", data).Return(span, nil)
writer.On("WriteSpan", span).Return(nil)

assert.Nil(t, processor.Process(message))

message.AssertExpectations(t)
writer.AssertExpectations(t)
}

func TestSpanProcessor_ProcessError(t *testing.T) {
writer := &smocks.Writer{}
unmarshallerMock := &umocks.Unmarshaller{}
processor := &KafkaSpanProcessor{
unmarshaller: unmarshallerMock,
writer: writer,
}

message := &cmocks.Message{}
data := []byte("police")

message.On("Value").Return(data)
unmarshallerMock.On("Unmarshal", data).Return(nil, errors.New("moocow"))

assert.Error(t, processor.Process(message))

message.AssertExpectations(t)
writer.AssertNotCalled(t, "WriteSpan")
}
48 changes: 48 additions & 0 deletions pkg/kafka/mocks/Unmarshaller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,22 @@ import (
"github.com/stretchr/testify/assert"
)

func TestProtoMarshaller(t *testing.T) {
marshaller := newProtobufMarshaller()
func TestProtobufMarshallerAndUnmarshaller(t *testing.T) {
testMarshallerAndUnmarshaller(t, newProtobufMarshaller(), NewProtobufUnmarshaller())
}

func TestJSONMarshallerAndUnmarshaller(t *testing.T) {
testMarshallerAndUnmarshaller(t, newJSONMarshaller(), NewJSONUnmarshaller())
}

func testMarshallerAndUnmarshaller(t *testing.T, marshaller Marshaller, unmarshaller Unmarshaller) {
bytes, err := marshaller.Marshal(sampleSpan)

assert.NoError(t, err)
assert.NotNil(t, bytes)
}

func TestJSONMarshaller(t *testing.T) {
marshaller := newJSONMarshaller()

bytes, err := marshaller.Marshal(sampleSpan)
resultSpan, err := unmarshaller.Unmarshal(bytes)

assert.NoError(t, err)
assert.NotNil(t, bytes)
assert.Equal(t, sampleSpan, resultSpan)
}
59 changes: 59 additions & 0 deletions plugin/storage/kafka/unmarshaller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"bytes"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"

"github.com/jaegertracing/jaeger/model"
)

// Unmarshaller decodes a byte array to a span
type Unmarshaller interface {
Unmarshal([]byte) (*model.Span, error)
}

// ProtobufUnmarshaller implements Unmarshaller
type ProtobufUnmarshaller struct{}

// NewProtobufUnmarshaller constructs a ProtobufUnmarshaller
func NewProtobufUnmarshaller() *ProtobufUnmarshaller {
return &ProtobufUnmarshaller{}
}

// Unmarshal decodes a protobuf byte array to a span
func (h *ProtobufUnmarshaller) Unmarshal(msg []byte) (*model.Span, error) {
newSpan := &model.Span{}
err := proto.Unmarshal(msg, newSpan)
return newSpan, err
}

// JSONUnmarshaller implements Unmarshaller
type JSONUnmarshaller struct{}

// NewJSONUnmarshaller constructs a JSONUnmarshaller
func NewJSONUnmarshaller() *JSONUnmarshaller {
return &JSONUnmarshaller{}
}

// Unmarshal decodes a json byte array to a span
func (h *JSONUnmarshaller) Unmarshal(msg []byte) (*model.Span, error) {
newSpan := &model.Span{}
err := jsonpb.Unmarshal(bytes.NewReader(msg), newSpan)
return newSpan, err
}

0 comments on commit 8f7e497

Please sign in to comment.