From 933efb3a52b0acd513a11e1d6711f3cf6578d8e0 Mon Sep 17 00:00:00 2001 From: Prithvi Raj Date: Fri, 20 Jul 2018 11:19:25 -0400 Subject: [PATCH] Add files for ingester (#940) This commit contains core functionality for asynchronous ingestion, future commits will make use of it. Signed-off-by: Prithvi Raj --- .../app/consumer/committing_processor.go | 51 +++++ .../app/consumer/committing_processor_test.go | 75 +++++++ cmd/ingester/app/consumer/consumer.go | 111 ++++++++++ cmd/ingester/app/consumer/consumer_metrics.go | 46 +++++ cmd/ingester/app/consumer/consumer_test.go | 165 +++++++++++++++ cmd/ingester/app/consumer/message.go | 50 +++++ cmd/ingester/app/consumer/message_test.go | 40 ++++ cmd/ingester/app/consumer/mocks/Message.go | 96 +++++++++ .../app/consumer/mocks/PartitionConsumer.go | 116 +++++++++++ .../app/consumer/mocks/SaramaConsumer.go | 61 ++++++ .../app/consumer/offset/concurrent_list.go | 87 ++++++++ .../consumer/offset/concurrent_list_test.go | 195 ++++++++++++++++++ cmd/ingester/app/consumer/offset/manager.go | 92 +++++++++ .../app/consumer/offset/manager_test.go | 49 +++++ .../app/consumer/processor_factory.go | 94 +++++++++ .../app/consumer/processor_factory_test.go | 102 +++++++++ cmd/ingester/app/processor/decorator/retry.go | 138 +++++++++++++ .../app/processor/decorator/retry_test.go | 143 +++++++++++++ .../app/processor/metrics_decorator.go | 50 +++++ .../app/processor/metrics_decorator_test.go | 59 ++++++ .../app/processor/mocks/SpanProcessor.go | 53 +++++ .../app/processor/parallel_processor.go | 80 +++++++ .../app/processor/parallel_processor_test.go | 46 +++++ cmd/ingester/app/processor/span_processor.go | 32 +++ glide.lock | 10 +- glide.yaml | 10 +- 26 files changed, 2045 insertions(+), 6 deletions(-) create mode 100644 cmd/ingester/app/consumer/committing_processor.go create mode 100644 cmd/ingester/app/consumer/committing_processor_test.go create mode 100644 cmd/ingester/app/consumer/consumer.go create mode 100644 cmd/ingester/app/consumer/consumer_metrics.go create mode 100644 cmd/ingester/app/consumer/consumer_test.go create mode 100644 cmd/ingester/app/consumer/message.go create mode 100644 cmd/ingester/app/consumer/message_test.go create mode 100644 cmd/ingester/app/consumer/mocks/Message.go create mode 100644 cmd/ingester/app/consumer/mocks/PartitionConsumer.go create mode 100644 cmd/ingester/app/consumer/mocks/SaramaConsumer.go create mode 100644 cmd/ingester/app/consumer/offset/concurrent_list.go create mode 100644 cmd/ingester/app/consumer/offset/concurrent_list_test.go create mode 100644 cmd/ingester/app/consumer/offset/manager.go create mode 100644 cmd/ingester/app/consumer/offset/manager_test.go create mode 100644 cmd/ingester/app/consumer/processor_factory.go create mode 100644 cmd/ingester/app/consumer/processor_factory_test.go create mode 100644 cmd/ingester/app/processor/decorator/retry.go create mode 100644 cmd/ingester/app/processor/decorator/retry_test.go create mode 100644 cmd/ingester/app/processor/metrics_decorator.go create mode 100644 cmd/ingester/app/processor/metrics_decorator_test.go create mode 100644 cmd/ingester/app/processor/mocks/SpanProcessor.go create mode 100644 cmd/ingester/app/processor/parallel_processor.go create mode 100644 cmd/ingester/app/processor/parallel_processor_test.go create mode 100644 cmd/ingester/app/processor/span_processor.go diff --git a/cmd/ingester/app/consumer/committing_processor.go b/cmd/ingester/app/consumer/committing_processor.go new file mode 100644 index 00000000000..d4223430766 --- /dev/null +++ b/cmd/ingester/app/consumer/committing_processor.go @@ -0,0 +1,51 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumer + +import ( + "errors" + "io" + + "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" +) + +type comittingProcessor struct { + processor processor.SpanProcessor + marker offsetMarker + io.Closer +} + +type offsetMarker interface { + MarkOffset(int64) +} + +// NewCommittingProcessor returns a processor that commits message offsets to Kafka +func NewCommittingProcessor(processor processor.SpanProcessor, marker offsetMarker) processor.SpanProcessor { + return &comittingProcessor{ + processor: processor, + marker: marker, + } +} + +func (d *comittingProcessor) Process(message processor.Message) error { + if msg, ok := message.(Message); ok { + err := d.processor.Process(message) + if err == nil { + d.marker.MarkOffset(msg.Offset()) + } + return err + } + return errors.New("committing processor used with non-kafka message") +} diff --git a/cmd/ingester/app/consumer/committing_processor_test.go b/cmd/ingester/app/consumer/committing_processor_test.go new file mode 100644 index 00000000000..80a3837c467 --- /dev/null +++ b/cmd/ingester/app/consumer/committing_processor_test.go @@ -0,0 +1,75 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumer + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + kafka "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks" + "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks" +) + +type fakeOffsetMarker struct { + capturedOffset int64 +} + +func (f *fakeOffsetMarker) MarkOffset(o int64) { + f.capturedOffset = o +} + +func TestNewCommittingProcessor(t *testing.T) { + msgOffset := int64(123) + offsetMarker := &fakeOffsetMarker{} + spanProcessor := &mocks.SpanProcessor{} + spanProcessor.On("Process", mock.Anything).Return(nil) + committingProcessor := NewCommittingProcessor(spanProcessor, offsetMarker) + + msg := &kafka.Message{} + msg.On("Offset").Return(msgOffset) + + assert.NoError(t, committingProcessor.Process(msg)) + + spanProcessor.AssertExpectations(t) + assert.Equal(t, msgOffset, offsetMarker.capturedOffset) +} + +func TestNewCommittingProcessorError(t *testing.T) { + offsetMarker := &fakeOffsetMarker{} + spanProcessor := &mocks.SpanProcessor{} + spanProcessor.On("Process", mock.Anything).Return(errors.New("boop")) + committingProcessor := NewCommittingProcessor(spanProcessor, offsetMarker) + msg := &kafka.Message{} + + assert.Error(t, committingProcessor.Process(msg)) + + spanProcessor.AssertExpectations(t) + assert.Equal(t, int64(0), offsetMarker.capturedOffset) +} + +type fakeProcessorMessage struct{} + +func (f fakeProcessorMessage) Value() []byte { + return nil +} + +func TestNewCommittingProcessorErrorNoKafkaMessage(t *testing.T) { + committingProcessor := NewCommittingProcessor(&mocks.SpanProcessor{}, &fakeOffsetMarker{}) + + assert.Error(t, committingProcessor.Process(fakeProcessorMessage{})) +} diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go new file mode 100644 index 00000000000..eee1f6d3e5e --- /dev/null +++ b/cmd/ingester/app/consumer/consumer.go @@ -0,0 +1,111 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumer + +import ( + "io" + "sync" + + "github.com/Shopify/sarama" + sc "github.com/bsm/sarama-cluster" + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" +) + +type consumer struct { + metricsFactory metrics.Factory + logger *zap.Logger + processorFactory processorFactory + + close chan struct{} + isClosed sync.WaitGroup + + SaramaConsumer +} + +// SaramaConsumer is an interface to features of Sarama that we use +type SaramaConsumer interface { + Partitions() <-chan sc.PartitionConsumer + MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) + io.Closer +} + +func (c *consumer) mainLoop() { + c.isClosed.Add(1) + c.logger.Info("Starting main loop") + go func() { + for { + select { + case pc := <-c.Partitions(): + c.isClosed.Add(2) + + go c.handleMessages(pc) + go c.handleErrors(pc.Partition(), pc.Errors()) + + case <-c.close: + c.isClosed.Done() + return + } + } + }() +} + +func (c *consumer) handleMessages(pc sc.PartitionConsumer) { + c.logger.Info("Starting message handler") + defer c.isClosed.Done() + defer c.closePartition(pc) + + msgMetrics := c.newMsgMetrics(pc.Partition()) + var msgProcessor processor.SpanProcessor + + for msg := range pc.Messages() { + c.logger.Debug("Got msg", zap.Any("msg", msg)) + msgMetrics.counter.Inc(1) + msgMetrics.offsetGauge.Update(msg.Offset) + msgMetrics.lagGauge.Update(pc.HighWaterMarkOffset() - msg.Offset - 1) + + if msgProcessor == nil { + msgProcessor = c.processorFactory.new(pc.Partition(), msg.Offset-1) + defer msgProcessor.Close() + } + + msgProcessor.Process(&saramaMessageWrapper{msg}) + } +} + +func (c *consumer) closePartition(partitionConsumer sc.PartitionConsumer) { + c.logger.Info("Closing partition consumer", zap.Int32("partition", partitionConsumer.Partition())) + partitionConsumer.Close() // blocks until messages channel is drained + c.logger.Info("Closed partition consumer", zap.Int32("partition", partitionConsumer.Partition())) +} + +func (c *consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) { + c.logger.Info("Starting error handler") + defer c.isClosed.Done() + + errMetrics := c.newErrMetrics(partition) + for err := range errChan { + errMetrics.errCounter.Inc(1) + c.logger.Error("Error consuming from Kafka", zap.Error(err)) + } +} + +func (c *consumer) Close() error { + close(c.close) + c.isClosed.Wait() + return c.SaramaConsumer.Close() +} diff --git a/cmd/ingester/app/consumer/consumer_metrics.go b/cmd/ingester/app/consumer/consumer_metrics.go new file mode 100644 index 00000000000..526856a2d54 --- /dev/null +++ b/cmd/ingester/app/consumer/consumer_metrics.go @@ -0,0 +1,46 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumer + +import ( + "strconv" + + "github.com/uber/jaeger-lib/metrics" +) + +type msgMetrics struct { + counter metrics.Counter + offsetGauge metrics.Gauge + lagGauge metrics.Gauge +} + +type errMetrics struct { + errCounter metrics.Counter +} + +func (c *consumer) newMsgMetrics(partition int32) msgMetrics { + f := c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))}) + return msgMetrics{ + counter: f.Counter("messages", nil), + offsetGauge: f.Gauge("current-offset", nil), + lagGauge: f.Gauge("offset-lag", nil), + } +} + +func (c *consumer) newErrMetrics(partition int32) errMetrics { + f := c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))}) + return errMetrics{errCounter: f.Counter("errors", nil)} + +} diff --git a/cmd/ingester/app/consumer/consumer_test.go b/cmd/ingester/app/consumer/consumer_test.go new file mode 100644 index 00000000000..e85eb284234 --- /dev/null +++ b/cmd/ingester/app/consumer/consumer_test.go @@ -0,0 +1,165 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumer + +import ( + "sync" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/bsm/sarama-cluster" + "github.com/pkg/errors" + "github.com/stretchr/testify/mock" + "github.com/uber/jaeger-lib/metrics" + "github.com/uber/jaeger-lib/metrics/testutils" + "go.uber.org/zap" + + kmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks" + "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks" +) + +//go:generate mockery -name SaramaConsumer +//go:generate mockery -dir ../../../../../vendor/github.com/bsm/sarama-cluster/ -name PartitionConsumer + +type consumerTest struct { + saramaConsumer *kmocks.SaramaConsumer + consumer *consumer + partitionConsumer *kmocks.PartitionConsumer +} + +func withWrappedConsumer(fn func(c *consumerTest)) { + sc := &kmocks.SaramaConsumer{} + logger, _ := zap.NewDevelopment() + metricsFactory := metrics.NewLocalFactory(0) + c := &consumerTest{ + saramaConsumer: sc, + consumer: &consumer{ + metricsFactory: metricsFactory, + logger: logger, + close: make(chan struct{}), + isClosed: sync.WaitGroup{}, + SaramaConsumer: sc, + processorFactory: processorFactory{ + topic: "topic", + consumer: sc, + metricsFactory: metricsFactory, + logger: logger, + baseProcessor: &mocks.SpanProcessor{}, + parallelism: 1, + }, + }, + } + + c.partitionConsumer = &kmocks.PartitionConsumer{} + pcha := make(chan cluster.PartitionConsumer, 1) + pcha <- c.partitionConsumer + c.saramaConsumer.On("Partitions").Return((<-chan cluster.PartitionConsumer)(pcha)) + c.saramaConsumer.On("Close").Return(nil) + c.saramaConsumer.On("MarkPartitionOffset", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + + fn(c) +} + +func TestSaramaConsumerWrapper_MarkPartitionOffset(t *testing.T) { + + withWrappedConsumer(func(c *consumerTest) { + topic := "morekuzambu" + partition := int32(316) + offset := int64(1111110111111) + metadata := "meatbag" + c.saramaConsumer.On("MarkPartitionOffset", topic, partition, offset, metadata).Return() + + c.consumer.MarkPartitionOffset(topic, partition, offset, metadata) + + c.saramaConsumer.AssertCalled(t, "MarkPartitionOffset", topic, partition, offset, metadata) + }) +} + +func TestSaramaConsumerWrapper_start_Messages(t *testing.T) { + withWrappedConsumer(func(c *consumerTest) { + msg := &sarama.ConsumerMessage{} + msg.Offset = 0 + msgCh := make(chan *sarama.ConsumerMessage, 1) + msgCh <- msg + + errCh := make(chan *sarama.ConsumerError, 1) + c.partitionConsumer.On("Partition").Return(int32(0)) + c.partitionConsumer.On("Errors").Return((<-chan *sarama.ConsumerError)(errCh)) + c.partitionConsumer.On("Messages").Return((<-chan *sarama.ConsumerMessage)(msgCh)) + c.partitionConsumer.On("HighWaterMarkOffset").Return(int64(1234)) + c.partitionConsumer.On("Close").Return(nil) + + mp := &mocks.SpanProcessor{} + mp.On("Process", &saramaMessageWrapper{msg}).Return(nil) + c.consumer.processorFactory.baseProcessor = mp + + c.consumer.mainLoop() + time.Sleep(100 * time.Millisecond) + close(msgCh) + close(errCh) + c.consumer.Close() + + mp.AssertExpectations(t) + + f := (c.consumer.metricsFactory).(*metrics.LocalFactory) + partitionTag := map[string]string{"partition": "0"} + testutils.AssertCounterMetrics(t, f, testutils.ExpectedMetric{ + Name: "sarama-consumer.messages", + Tags: partitionTag, + Value: 1, + }) + testutils.AssertGaugeMetrics(t, f, testutils.ExpectedMetric{ + Name: "sarama-consumer.current-offset", + Tags: partitionTag, + Value: 0, + }) + testutils.AssertGaugeMetrics(t, f, testutils.ExpectedMetric{ + Name: "sarama-consumer.offset-lag", + Tags: partitionTag, + Value: 1233, + }) + }) +} + +func TestSaramaConsumerWrapper_start_Errors(t *testing.T) { + withWrappedConsumer(func(c *consumerTest) { + errCh := make(chan *sarama.ConsumerError, 1) + errCh <- &sarama.ConsumerError{ + Topic: "some-topic", + Err: errors.New("some error"), + } + + msgCh := make(chan *sarama.ConsumerMessage) + + c.partitionConsumer.On("Partition").Return(int32(0)) + c.partitionConsumer.On("Errors").Return((<-chan *sarama.ConsumerError)(errCh)) + c.partitionConsumer.On("Messages").Return((<-chan *sarama.ConsumerMessage)(msgCh)) + c.partitionConsumer.On("Close").Return(nil) + + c.consumer.mainLoop() + time.Sleep(100 * time.Millisecond) + close(msgCh) + close(errCh) + c.consumer.Close() + f := (c.consumer.metricsFactory).(*metrics.LocalFactory) + partitionTag := map[string]string{"partition": "0"} + testutils.AssertCounterMetrics(t, f, testutils.ExpectedMetric{ + Name: "sarama-consumer.errors", + Tags: partitionTag, + Value: 1, + }) + }) +} diff --git a/cmd/ingester/app/consumer/message.go b/cmd/ingester/app/consumer/message.go new file mode 100644 index 00000000000..3ed64cd87a2 --- /dev/null +++ b/cmd/ingester/app/consumer/message.go @@ -0,0 +1,50 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumer + +import "github.com/Shopify/sarama" + +// Message contains the parts of a sarama ConsumerMessage that we care about. +type Message interface { + Key() []byte + Value() []byte + Topic() string + Partition() int32 + Offset() int64 +} + +type saramaMessageWrapper struct { + *sarama.ConsumerMessage +} + +func (m *saramaMessageWrapper) Key() []byte { + return m.ConsumerMessage.Key +} + +func (m *saramaMessageWrapper) Value() []byte { + return m.ConsumerMessage.Value +} + +func (m *saramaMessageWrapper) Topic() string { + return m.ConsumerMessage.Topic +} + +func (m *saramaMessageWrapper) Partition() int32 { + return m.ConsumerMessage.Partition +} + +func (m *saramaMessageWrapper) Offset() int64 { + return m.ConsumerMessage.Offset +} diff --git a/cmd/ingester/app/consumer/message_test.go b/cmd/ingester/app/consumer/message_test.go new file mode 100644 index 00000000000..c567a2f66e2 --- /dev/null +++ b/cmd/ingester/app/consumer/message_test.go @@ -0,0 +1,40 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumer + +import ( + "testing" + + "github.com/Shopify/sarama" + "github.com/stretchr/testify/assert" +) + +func TestSaramaMessageWrapper(t *testing.T) { + saramaMessage := &sarama.ConsumerMessage{ + Key: []byte("some key"), + Value: []byte("some value"), + Topic: "some topic", + Partition: 555, + Offset: 1942, + } + + wrappedMessage := saramaMessageWrapper{saramaMessage} + + assert.Equal(t, saramaMessage.Key, wrappedMessage.Key()) + assert.Equal(t, saramaMessage.Value, wrappedMessage.Value()) + assert.Equal(t, saramaMessage.Topic, wrappedMessage.Topic()) + assert.Equal(t, saramaMessage.Partition, wrappedMessage.Partition()) + assert.Equal(t, saramaMessage.Offset, wrappedMessage.Offset()) +} diff --git a/cmd/ingester/app/consumer/mocks/Message.go b/cmd/ingester/app/consumer/mocks/Message.go new file mode 100644 index 00000000000..5e72c95da95 --- /dev/null +++ b/cmd/ingester/app/consumer/mocks/Message.go @@ -0,0 +1,96 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// Message is an autogenerated mock type for the Message type +type Message struct { + mock.Mock +} + +// Key provides a mock function with given fields: +func (_m *Message) Key() []byte { + ret := _m.Called() + + var r0 []byte + if rf, ok := ret.Get(0).(func() []byte); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + return r0 +} + +// Offset provides a mock function with given fields: +func (_m *Message) Offset() int64 { + ret := _m.Called() + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// Partition provides a mock function with given fields: +func (_m *Message) Partition() int32 { + ret := _m.Called() + + var r0 int32 + if rf, ok := ret.Get(0).(func() int32); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int32) + } + + return r0 +} + +// Topic provides a mock function with given fields: +func (_m *Message) Topic() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// Value provides a mock function with given fields: +func (_m *Message) Value() []byte { + ret := _m.Called() + + var r0 []byte + if rf, ok := ret.Get(0).(func() []byte); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + return r0 +} diff --git a/cmd/ingester/app/consumer/mocks/PartitionConsumer.go b/cmd/ingester/app/consumer/mocks/PartitionConsumer.go new file mode 100644 index 00000000000..84e54647079 --- /dev/null +++ b/cmd/ingester/app/consumer/mocks/PartitionConsumer.go @@ -0,0 +1,116 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mocks + +import mock "github.com/stretchr/testify/mock" +import sarama "github.com/Shopify/sarama" + +// PartitionConsumer is an autogenerated mock type for the PartitionConsumer type +type PartitionConsumer struct { + mock.Mock +} + +// AsyncClose provides a mock function with given fields: +func (_m *PartitionConsumer) AsyncClose() { + _m.Called() +} + +// Close provides a mock function with given fields: +func (_m *PartitionConsumer) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Errors provides a mock function with given fields: +func (_m *PartitionConsumer) Errors() <-chan *sarama.ConsumerError { + ret := _m.Called() + + var r0 <-chan *sarama.ConsumerError + if rf, ok := ret.Get(0).(func() <-chan *sarama.ConsumerError); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan *sarama.ConsumerError) + } + } + + return r0 +} + +// HighWaterMarkOffset provides a mock function with given fields: +func (_m *PartitionConsumer) HighWaterMarkOffset() int64 { + ret := _m.Called() + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// Messages provides a mock function with given fields: +func (_m *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage { + ret := _m.Called() + + var r0 <-chan *sarama.ConsumerMessage + if rf, ok := ret.Get(0).(func() <-chan *sarama.ConsumerMessage); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan *sarama.ConsumerMessage) + } + } + + return r0 +} + +// Partition provides a mock function with given fields: +func (_m *PartitionConsumer) Partition() int32 { + ret := _m.Called() + + var r0 int32 + if rf, ok := ret.Get(0).(func() int32); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int32) + } + + return r0 +} + +// Topic provides a mock function with given fields: +func (_m *PartitionConsumer) Topic() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} diff --git a/cmd/ingester/app/consumer/mocks/SaramaConsumer.go b/cmd/ingester/app/consumer/mocks/SaramaConsumer.go new file mode 100644 index 00000000000..5c3c0fdedbf --- /dev/null +++ b/cmd/ingester/app/consumer/mocks/SaramaConsumer.go @@ -0,0 +1,61 @@ +// Code generated by mockery v1.0.0 + +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mocks + +import cluster "github.com/bsm/sarama-cluster" + +import mock "github.com/stretchr/testify/mock" + +// SaramaConsumer is an autogenerated mock type for the SaramaConsumer type +type SaramaConsumer struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *SaramaConsumer) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MarkPartitionOffset provides a mock function with given fields: topic, partition, offset, metadata +func (_m *SaramaConsumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) { + _m.Called(topic, partition, offset, metadata) +} + +// Partitions provides a mock function with given fields: +func (_m *SaramaConsumer) Partitions() <-chan cluster.PartitionConsumer { + ret := _m.Called() + + var r0 <-chan cluster.PartitionConsumer + if rf, ok := ret.Get(0).(func() <-chan cluster.PartitionConsumer); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan cluster.PartitionConsumer) + } + } + + return r0 +} diff --git a/cmd/ingester/app/consumer/offset/concurrent_list.go b/cmd/ingester/app/consumer/offset/concurrent_list.go new file mode 100644 index 00000000000..decd0c8e4b2 --- /dev/null +++ b/cmd/ingester/app/consumer/offset/concurrent_list.go @@ -0,0 +1,87 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package offset + +import ( + "sync" +) + +// ConcurrentList is a list that maintains kafka offsets with thread-safe Insert and setToHighestContiguous operations +type ConcurrentList struct { + offsets []int64 + mutex sync.Mutex +} + +func newConcurrentList(minOffset int64) *ConcurrentList { + return &ConcurrentList{offsets: []int64{minOffset}} +} + +// Insert into the list in O(1) time. +// This operation is thread-safe +func (s *ConcurrentList) insert(offset int64) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.offsets = append(s.offsets, offset) +} + +// setToHighestContiguous sets head to highestContiguous and returns the message and status. +// This is a O(n) operation. +// highestContiguous is defined as the highest sequential integer encountered while traversing from the head of the +// list. +// For e.g., if the list is [1, 2, 3, 5], the highestContiguous is 3. +// This operation is thread-safe +func (s *ConcurrentList) setToHighestContiguous() int64 { + s.mutex.Lock() + offsets := s.offsets + s.offsets = nil + s.mutex.Unlock() + + highestContiguousOffset := getHighestContiguous(offsets) + + var higherOffsets []int64 + for _, offset := range offsets { + if offset >= highestContiguousOffset { + higherOffsets = append(higherOffsets, offset) + } + } + + s.mutex.Lock() + s.offsets = append(s.offsets, higherOffsets...) + s.mutex.Unlock() + return highestContiguousOffset +} + +func getHighestContiguous(offsets []int64) int64 { + offsetSet := make(map[int64]struct{}, len(offsets)) + minOffset := offsets[0] + + for _, offset := range offsets { + offsetSet[offset] = struct{}{} + if minOffset > offset { + minOffset = offset + } + } + + highestContiguous := minOffset + for { + if _, ok := offsetSet[highestContiguous+1]; ok { + highestContiguous++ + } else { + break + } + } + + return highestContiguous +} diff --git a/cmd/ingester/app/consumer/offset/concurrent_list_test.go b/cmd/ingester/app/consumer/offset/concurrent_list_test.go new file mode 100644 index 00000000000..22eba417132 --- /dev/null +++ b/cmd/ingester/app/consumer/offset/concurrent_list_test.go @@ -0,0 +1,195 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package offset + +import ( + "fmt" + "math/rand" + "testing" + + "github.com/stretchr/testify/assert" +) + +func insert(list *ConcurrentList, offsets ...int64) { + for _, offset := range offsets { + list.insert(offset) + } +} + +func TestInsert(t *testing.T) { + for _, testCase := range generatePermutations([]int64{1, 2, 3}) { + min, toInsert := extractMin(testCase) + s := newConcurrentList(min) + insert(s, toInsert...) + assert.ElementsMatch(t, testCase, s.offsets) + } +} + +func TestGetHighestAndReset(t *testing.T) { + testCases := []struct { + input []int64 + expectedOffset int64 + expectedList []int64 + }{ + { + input: []int64{1}, + expectedOffset: 1, + expectedList: []int64{1}, + }, + { + input: []int64{1, 20}, + expectedOffset: 1, + expectedList: []int64{1, 20}, + }, + { + input: []int64{1, 2}, + expectedOffset: 2, + expectedList: []int64{2}, + }, + { + input: []int64{4, 5, 6}, + expectedOffset: 6, + expectedList: []int64{6}, + }, + { + input: []int64{1, 2, 4, 5}, + expectedOffset: 2, + expectedList: []int64{2, 4, 5}, + }, + } + + for _, testCase := range testCases { + for _, input := range generatePermutations(testCase.input) { + t.Run(fmt.Sprintf("%v", input), func(t *testing.T) { + min, input := extractMin(input) + s := newConcurrentList(min) + insert(s, input...) + actualOffset := s.setToHighestContiguous() + assert.ElementsMatch(t, testCase.expectedList, s.offsets) + assert.Equal(t, testCase.expectedOffset, actualOffset) + }) + } + } +} + +func TestMultipleInsertsAndResets(t *testing.T) { + l := newConcurrentList(100) + + for i := 101; i < 200; i++ { + l.insert(int64(i)) + } + l.insert(50) + + assert.Equal(t, 101, len(l.offsets)) + assert.Equal(t, int64(50), l.offsets[100]) + + r := l.setToHighestContiguous() + assert.Equal(t, int64(50), r) + assert.Equal(t, 101, len(l.offsets)) + + for i := 51; i < 99; i++ { + l.insert(int64(i)) + } + + r = l.setToHighestContiguous() + assert.Equal(t, int64(98), r) + assert.Equal(t, 101, len(l.offsets)) +} + +// Heaps algorithm as per https://stackoverflow.com/questions/30226438/generate-all-permutations-in-go +func generatePermutations(arr []int64) [][]int64 { + var helper func([]int64, int) + res := [][]int64{} + + helper = func(arr []int64, n int) { + if n == 1 { + tmp := make([]int64, len(arr)) + copy(tmp, arr) + res = append(res, tmp) + } else { + for i := 0; i < n; i++ { + helper(arr, n-1) + if n%2 == 1 { + tmp := arr[i] + arr[i] = arr[n-1] + arr[n-1] = tmp + } else { + tmp := arr[0] + arr[0] = arr[n-1] + arr[n-1] = tmp + } + } + } + } + helper(arr, len(arr)) + return res +} + +func extractMin(arr []int64) (int64, []int64) { + minIdx := 0 + for i := range arr { + if arr[minIdx] > arr[i] { + minIdx = i + } + } + var toRet []int64 + toRet = append(toRet, arr[:minIdx]...) + toRet = append(toRet, arr[minIdx+1:]...) + + return arr[minIdx], toRet +} + +// BenchmarkInserts-8 100000000 70.6 ns/op 49 B/op 0 allocs/op +func BenchmarkInserts(b *testing.B) { + l := newConcurrentList(0) + for i := 1; i < b.N; i++ { + l.insert(int64(i)) + } +} + +// BenchmarkReset-8 10000 1006342 ns/op 1302421 B/op 64 allocs/op +func BenchmarkResetTwice(b *testing.B) { + var toInsert []int64 + for i := int(10e7); i < b.N+int(10e7); i++ { + toInsert = append(toInsert, int64(i)) + } + + l := newConcurrentList(toInsert[0]) + + // Create a gap + toInsert[b.N/2] = 0 + + for i := 0; i < b.N; i++ { + n := i + rand.Intn(b.N-i) + toInsert[i], toInsert[n] = toInsert[n], toInsert[i] + } + + for i := 0; i < b.N; i++ { + l.insert(toInsert[i]) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + l.setToHighestContiguous() + } + + b.StopTimer() + l.offsets = l.offsets[1:] + b.StartTimer() + + for i := 0; i < b.N; i++ { + l.setToHighestContiguous() + } +} diff --git a/cmd/ingester/app/consumer/offset/manager.go b/cmd/ingester/app/consumer/offset/manager.go new file mode 100644 index 00000000000..0772cc58adb --- /dev/null +++ b/cmd/ingester/app/consumer/offset/manager.go @@ -0,0 +1,92 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package offset + +import ( + "strconv" + "sync" + "time" + + "github.com/uber/jaeger-lib/metrics" +) + +const ( + resetInterval = 100 * time.Millisecond +) + +// Manager accepts kafka offsets and commits them using the provided kafka consumer +// +// The Manager is designed to be used in a scenario where the consumption of kafka offsets +// is decoupled from the processing of offsets asynchronously via goroutines. This breaks the +// ordering guarantee which could result in the completion of processing of an earlier message +// after the processing of a later message. +// +// It assumes that Kafka offsets are sequential and monotonically increasing[1], and maintains +// sorted lists of offsets per partition. +// +// [1] https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html +type Manager struct { + markOffsetFunction MarkOffset + offsetCommitCount metrics.Counter + lastCommittedOffset metrics.Gauge + list *ConcurrentList + close chan struct{} + isClosed sync.WaitGroup +} + +// MarkOffset is a func that marks offsets in Kafka +type MarkOffset func(offset int64) + +// NewManager creates a new Manager +func NewManager(minOffset int64, markOffset MarkOffset, partition int32, factory metrics.Factory) *Manager { + return &Manager{ + markOffsetFunction: markOffset, + close: make(chan struct{}), + offsetCommitCount: factory.Counter("offset-commit", map[string]string{"partition": strconv.Itoa(int(partition))}), + lastCommittedOffset: factory.Gauge("offset-commit", map[string]string{"partition": strconv.Itoa(int(partition))}), + list: newConcurrentList(minOffset), + } +} + +// MarkOffset marks the offset of a consumer message +func (m *Manager) MarkOffset(offset int64) { + m.list.insert(offset) +} + +// Start starts the Manager +func (m *Manager) Start() { + m.isClosed.Add(1) + go func() { + for { + select { + case <-time.After(resetInterval): + offset := m.list.setToHighestContiguous() + m.offsetCommitCount.Inc(1) + m.lastCommittedOffset.Update(offset) + m.markOffsetFunction(offset) + case <-m.close: + m.isClosed.Done() + return + } + } + }() +} + +// Close closes the Manager +func (m *Manager) Close() error { + close(m.close) + m.isClosed.Wait() + return nil +} diff --git a/cmd/ingester/app/consumer/offset/manager_test.go b/cmd/ingester/app/consumer/offset/manager_test.go new file mode 100644 index 00000000000..e6bbd271c34 --- /dev/null +++ b/cmd/ingester/app/consumer/offset/manager_test.go @@ -0,0 +1,49 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package offset + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/uber/jaeger-lib/metrics" +) + +func TestHandleReset(t *testing.T) { + offset := int64(1498) + minOffset := offset - 1 + + m := metrics.NewLocalFactory(0) + + var wg sync.WaitGroup + wg.Add(1) + var captureOffset int64 + fakeMarker := func(offset int64) { + captureOffset = offset + wg.Done() + } + manager := NewManager(minOffset, fakeMarker, 1, m) + manager.Start() + + manager.MarkOffset(offset) + wg.Wait() + manager.Close() + + assert.Equal(t, offset, captureOffset) + cnt, g := m.Snapshot() + assert.Equal(t, int64(1), cnt["offset-commit|partition=1"]) + assert.Equal(t, int64(offset), g["offset-commit|partition=1"]) +} diff --git a/cmd/ingester/app/consumer/processor_factory.go b/cmd/ingester/app/consumer/processor_factory.go new file mode 100644 index 00000000000..34f7e1b37b5 --- /dev/null +++ b/cmd/ingester/app/consumer/processor_factory.go @@ -0,0 +1,94 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumer + +import ( + "io" + + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/offset" + "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" + "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/decorator" +) + +type processorFactory struct { + topic string + consumer SaramaConsumer + metricsFactory metrics.Factory + logger *zap.Logger + baseProcessor processor.SpanProcessor + parallelism int +} + +func (c *processorFactory) new(partition int32, minOffset int64) processor.SpanProcessor { + c.logger.Info("Creating new processors", zap.Int32("partition", partition)) + + markOffset := func(offset int64) { + c.consumer.MarkPartitionOffset(c.topic, partition, offset, "") + } + + om := offset.NewManager(minOffset, markOffset, partition, c.metricsFactory) + + retryProcessor := decorator.NewRetryingProcessor(c.metricsFactory, c.baseProcessor) + cp := NewCommittingProcessor(retryProcessor, om) + spanProcessor := processor.NewDecoratedProcessor(c.metricsFactory, cp) + pp := processor.NewParallelProcessor(spanProcessor, c.parallelism, c.logger) + + return newStartedProcessor(pp, om) +} + +type service interface { + Start() + io.Closer +} + +type startProcessor interface { + Start() + processor.SpanProcessor +} + +type startedProcessor struct { + services []service + processor startProcessor +} + +func newStartedProcessor(parallelProcessor startProcessor, services ...service) processor.SpanProcessor { + s := &startedProcessor{ + services: services, + processor: parallelProcessor, + } + + for _, service := range services { + service.Start() + } + + s.processor.Start() + return s +} + +func (c *startedProcessor) Process(message processor.Message) error { + return c.processor.Process(message) +} + +func (c *startedProcessor) Close() error { + c.processor.Close() + + for _, service := range c.services { + service.Close() + } + return nil +} diff --git a/cmd/ingester/app/consumer/processor_factory_test.go b/cmd/ingester/app/consumer/processor_factory_test.go new file mode 100644 index 00000000000..622460772e7 --- /dev/null +++ b/cmd/ingester/app/consumer/processor_factory_test.go @@ -0,0 +1,102 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumer + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" + + kmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks" + "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks" +) + +func Test_new(t *testing.T) { + + mockConsumer := &kmocks.SaramaConsumer{} + mockConsumer.On("MarkPartitionOffset", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + + topic := "coelacanth" + partition := int32(21) + offset := int64(555) + + pf := processorFactory{ + topic: topic, + consumer: mockConsumer, + metricsFactory: metrics.NullFactory, + logger: zap.NewNop(), + baseProcessor: &mocks.SpanProcessor{}, + parallelism: 1, + } + assert.NotNil(t, pf.new(partition, offset)) + + // This sleep is greater than offset manager's resetInterval to allow it a chance to + // call MarkPartitionOffset. + time.Sleep(150 * time.Millisecond) + mockConsumer.AssertCalled(t, "MarkPartitionOffset", topic, partition, offset, "") +} + +type fakeService struct { + startCalled bool + closeCalled bool +} + +func (f *fakeService) Start() { + f.startCalled = true +} + +func (f *fakeService) Close() error { + f.closeCalled = true + return nil +} + +type fakeProcessor struct { + startCalled bool + mocks.SpanProcessor +} + +func (f *fakeProcessor) Start() { + f.startCalled = true +} + +type fakeMsg struct{} + +func (f *fakeMsg) Value() []byte { + return nil +} + +func Test_startedProcessor_Process(t *testing.T) { + service := &fakeService{} + processor := &fakeProcessor{} + processor.On("Close").Return(nil) + + s := newStartedProcessor(processor, service) + + assert.True(t, service.startCalled) + assert.True(t, processor.startCalled) + + msg := &fakeMsg{} + processor.On("Process", msg).Return(nil) + + s.Process(msg) + + s.Close() + assert.True(t, service.closeCalled) + processor.AssertExpectations(t) +} diff --git a/cmd/ingester/app/processor/decorator/retry.go b/cmd/ingester/app/processor/decorator/retry.go new file mode 100644 index 00000000000..c20863f03af --- /dev/null +++ b/cmd/ingester/app/processor/decorator/retry.go @@ -0,0 +1,138 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decorator + +import ( + "io" + "math/rand" + "time" + + "github.com/uber/jaeger-lib/metrics" + + "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" +) + +type retryDecorator struct { + processor processor.SpanProcessor + retryAttempts metrics.Counter + exhausted metrics.Counter + options retryOptions + io.Closer +} + +// RetryOption allows setting options for exponential backoff retried +type RetryOption func(*retryOptions) + +type retryOptions struct { + minInterval, maxInterval time.Duration + maxAttempts uint + propagateError bool + rand randInt63 +} + +type randInt63 interface { + Int63n(int64) int64 +} + +var defaultOpts = retryOptions{ + minInterval: time.Second, + maxInterval: 1 * time.Minute, + maxAttempts: 10, + propagateError: false, + rand: rand.New(rand.NewSource(time.Now().UnixNano())), +} + +// MinBackoffInterval sets the minimum backoff interval +func MinBackoffInterval(t time.Duration) RetryOption { + return func(opt *retryOptions) { + opt.minInterval = t + } +} + +// MaxAttempts sets the maximum number of attempts to retry +func MaxAttempts(attempts uint) RetryOption { + return func(opt *retryOptions) { + opt.maxAttempts = attempts + } +} + +// MaxBackoffInterval sets the maximum backoff interval +func MaxBackoffInterval(t time.Duration) RetryOption { + return func(opt *retryOptions) { + opt.maxInterval = t + } +} + +// Rand sets a random number generator +func Rand(r randInt63) RetryOption { + return func(opt *retryOptions) { + opt.rand = r + } +} + +// PropagateError sets whether to propagate errors when retries are exhausted +func PropagateError(b bool) RetryOption { + return func(opt *retryOptions) { + opt.propagateError = b + } +} + +// NewRetryingProcessor returns a processor that retries failures using an exponential backoff +// with jitter. +func NewRetryingProcessor(f metrics.Factory, processor processor.SpanProcessor, opts ...RetryOption) processor.SpanProcessor { + options := defaultOpts + for _, opt := range opts { + opt(&options) + } + + m := f.Namespace("span-processor", nil) + return &retryDecorator{ + retryAttempts: m.Counter("retry-attempts", nil), + exhausted: m.Counter("retry-exhausted", nil), + processor: processor, + options: options, + } +} + +func (d *retryDecorator) Process(message processor.Message) error { + err := d.processor.Process(message) + + if err == nil { + return nil + } + + for attempts := uint(0); err != nil && d.options.maxAttempts > attempts; attempts++ { + time.Sleep(d.computeInterval(attempts)) + err = d.processor.Process(message) + d.retryAttempts.Inc(1) + } + + if err != nil { + d.exhausted.Inc(1) + if d.options.propagateError { + return err + } + } + + return nil +} + +func (d *retryDecorator) computeInterval(attempts uint) time.Duration { + dur := (1 << attempts) * d.options.minInterval.Nanoseconds() + if dur <= 0 || dur > d.options.maxInterval.Nanoseconds() { + dur = d.options.maxInterval.Nanoseconds() + } + return time.Duration(d.options.rand.Int63n(dur)) +} diff --git a/cmd/ingester/app/processor/decorator/retry_test.go b/cmd/ingester/app/processor/decorator/retry_test.go new file mode 100644 index 00000000000..50d92cb6370 --- /dev/null +++ b/cmd/ingester/app/processor/decorator/retry_test.go @@ -0,0 +1,143 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decorator + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/uber/jaeger-lib/metrics" + + "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks" +) + +type fakeMsg struct{} + +func (fakeMsg) Value() []byte { + return nil +} +func TestNewRetryingProcessor(t *testing.T) { + mockProcessor := &mocks.SpanProcessor{} + msg := &fakeMsg{} + mockProcessor.On("Process", msg).Return(nil) + lf := metrics.NewLocalFactory(0) + rp := NewRetryingProcessor(lf, mockProcessor) + + assert.NoError(t, rp.Process(msg)) + + mockProcessor.AssertExpectations(t) + c, _ := lf.Snapshot() + assert.Equal(t, int64(0), c["span-processor.retry-exhausted"]) + assert.Equal(t, int64(0), c["span-processor.retry-attempts"]) +} + +func TestNewRetryingProcessorError(t *testing.T) { + mockProcessor := &mocks.SpanProcessor{} + msg := &fakeMsg{} + mockProcessor.On("Process", msg).Return(errors.New("retry")) + opts := []RetryOption{ + MinBackoffInterval(0), + MaxBackoffInterval(time.Second), + MaxAttempts(2), + PropagateError(true), + Rand(&fakeRand{})} + lf := metrics.NewLocalFactory(0) + rp := NewRetryingProcessor(lf, mockProcessor, opts...) + + assert.Error(t, rp.Process(msg)) + + mockProcessor.AssertNumberOfCalls(t, "Process", 3) + c, _ := lf.Snapshot() + assert.Equal(t, int64(1), c["span-processor.retry-exhausted"]) + assert.Equal(t, int64(2), c["span-processor.retry-attempts"]) +} + +func TestNewRetryingProcessorNoErrorPropagation(t *testing.T) { + mockProcessor := &mocks.SpanProcessor{} + msg := &fakeMsg{} + mockProcessor.On("Process", msg).Return(errors.New("retry")) + opts := []RetryOption{ + MinBackoffInterval(0), + MaxBackoffInterval(time.Second), + MaxAttempts(1), + PropagateError(false), + Rand(&fakeRand{})} + + lf := metrics.NewLocalFactory(0) + rp := NewRetryingProcessor(lf, mockProcessor, opts...) + + assert.NoError(t, rp.Process(msg)) + mockProcessor.AssertNumberOfCalls(t, "Process", 2) + c, _ := lf.Snapshot() + assert.Equal(t, int64(1), c["span-processor.retry-exhausted"]) + assert.Equal(t, int64(1), c["span-processor.retry-attempts"]) +} + +type fakeRand struct{} + +func (f *fakeRand) Int63n(v int64) int64 { + return v +} + +func Test_ProcessBackoff(t *testing.T) { + minBackoff := time.Second + maxBackoff := time.Minute + tests := []struct { + name string + attempt uint + expectedInterval time.Duration + }{ + { + name: "zeroth retry attempt, minBackoff", + attempt: 0, + expectedInterval: minBackoff, + }, + { + name: "first retry attempt, 2 x minBackoff", + attempt: 1, + expectedInterval: 2 * minBackoff, + }, + { + name: "second attempt, 4 x minBackoff", + attempt: 2, + expectedInterval: 2 * 2 * minBackoff, + }, + { + name: "sixth attempt, maxBackoff", + attempt: 6, + expectedInterval: maxBackoff, + }, + { + name: "overflows, maxBackoff", + attempt: 64, + expectedInterval: maxBackoff, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rd := &retryDecorator{ + retryAttempts: metrics.NullCounter, + options: retryOptions{ + minInterval: minBackoff, + maxInterval: maxBackoff, + rand: &fakeRand{}, + }, + } + assert.Equal(t, tt.expectedInterval, rd.computeInterval(tt.attempt)) + }) + } +} diff --git a/cmd/ingester/app/processor/metrics_decorator.go b/cmd/ingester/app/processor/metrics_decorator.go new file mode 100644 index 00000000000..6efc06ebb96 --- /dev/null +++ b/cmd/ingester/app/processor/metrics_decorator.go @@ -0,0 +1,50 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "io" + "time" + + "github.com/uber/jaeger-lib/metrics" +) + +type metricsDecorator struct { + errors metrics.Counter + latency metrics.Timer + processor SpanProcessor + io.Closer +} + +// NewDecoratedProcessor returns a processor with metrics +func NewDecoratedProcessor(f metrics.Factory, processor SpanProcessor) SpanProcessor { + m := f.Namespace("span-processor", nil) + return &metricsDecorator{ + errors: m.Counter("errors", nil), + latency: m.Timer("latency", nil), + processor: processor, + } +} + +func (d *metricsDecorator) Process(message Message) error { + now := time.Now() + + err := d.processor.Process(message) + d.latency.Record(time.Since(now)) + if err != nil { + d.errors.Inc(1) + } + return err +} diff --git a/cmd/ingester/app/processor/metrics_decorator_test.go b/cmd/ingester/app/processor/metrics_decorator_test.go new file mode 100644 index 00000000000..ed3249a932f --- /dev/null +++ b/cmd/ingester/app/processor/metrics_decorator_test.go @@ -0,0 +1,59 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor_test + +import ( + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/uber/jaeger-lib/metrics" + + "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" + "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks" +) + +type fakeMsg struct{} + +func (fakeMsg) Value() []byte { + return nil +} + +func TestProcess(t *testing.T) { + p := &mocks.SpanProcessor{} + msg := fakeMsg{} + p.On("Process", msg).Return(nil) + m := metrics.NewLocalFactory(0) + proc := processor.NewDecoratedProcessor(m, p) + + proc.Process(msg) + p.AssertExpectations(t) + _, g := m.Snapshot() + assert.Contains(t, g, "span-processor.latency.P90") +} + +func TestProcessErr(t *testing.T) { + p := &mocks.SpanProcessor{} + msg := fakeMsg{} + p.On("Process", msg).Return(errors.New("err")) + m := metrics.NewLocalFactory(0) + proc := processor.NewDecoratedProcessor(m, p) + + proc.Process(msg) + p.AssertExpectations(t) + c, g := m.Snapshot() + assert.Contains(t, g, "span-processor.latency.P90") + assert.Equal(t, int64(1), c["span-processor.errors"]) +} diff --git a/cmd/ingester/app/processor/mocks/SpanProcessor.go b/cmd/ingester/app/processor/mocks/SpanProcessor.go new file mode 100644 index 00000000000..83c086ed687 --- /dev/null +++ b/cmd/ingester/app/processor/mocks/SpanProcessor.go @@ -0,0 +1,53 @@ +// Code generated by mockery v1.0.0 + +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mocks + +import mock "github.com/stretchr/testify/mock" +import processor "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" + +// SpanProcessor is an autogenerated mock type for the SpanProcessor type +type SpanProcessor struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *SpanProcessor) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Process provides a mock function with given fields: input +func (_m *SpanProcessor) Process(input processor.Message) error { + ret := _m.Called(input) + + var r0 error + if rf, ok := ret.Get(0).(func(processor.Message) error); ok { + r0 = rf(input) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/cmd/ingester/app/processor/parallel_processor.go b/cmd/ingester/app/processor/parallel_processor.go new file mode 100644 index 00000000000..87738f0a950 --- /dev/null +++ b/cmd/ingester/app/processor/parallel_processor.go @@ -0,0 +1,80 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "sync" + + "go.uber.org/zap" +) + +// ParallelProcessor is a processor that processes in parallel using a pool of goroutines +type ParallelProcessor struct { + messages chan Message + processor SpanProcessor + numRoutines int + + logger *zap.Logger + closed chan struct{} + wg sync.WaitGroup +} + +// NewParallelProcessor creates a new parallel processor +func NewParallelProcessor( + processor SpanProcessor, + parallelism int, + logger *zap.Logger) *ParallelProcessor { + return &ParallelProcessor{ + logger: logger, + messages: make(chan Message), + processor: processor, + numRoutines: parallelism, + closed: make(chan struct{}), + } +} + +// Start begins processing queued messages +func (k *ParallelProcessor) Start() { + k.logger.Debug("Spawning goroutines to process messages", zap.Int("num_routines", k.numRoutines)) + for i := 0; i < k.numRoutines; i++ { + k.wg.Add(1) + go func() { + for { + select { + case msg := <-k.messages: + k.processor.Process(msg) + case <-k.closed: + k.wg.Done() + return + } + } + }() + } +} + +// Process queues a message for processing +func (k *ParallelProcessor) Process(message Message) error { + k.messages <- message + return nil +} + +// Close terminates all running goroutines +func (k *ParallelProcessor) Close() error { + k.logger.Debug("Initiated shutdown of processor goroutines") + close(k.closed) + k.wg.Wait() + k.logger.Info("Completed shutdown of processor goroutines") + return nil +} diff --git a/cmd/ingester/app/processor/parallel_processor_test.go b/cmd/ingester/app/processor/parallel_processor_test.go new file mode 100644 index 00000000000..a5f2482d6af --- /dev/null +++ b/cmd/ingester/app/processor/parallel_processor_test.go @@ -0,0 +1,46 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor_test + +import ( + "testing" + "time" + + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" + mockProcessor "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks" +) + +type fakeMessage struct{} + +func (fakeMessage) Value() []byte { + return nil +} + +func TestNewParallelProcessor(t *testing.T) { + msg := &fakeMessage{} + mp := &mockProcessor.SpanProcessor{} + mp.On("Process", msg).Return(nil) + + pp := processor.NewParallelProcessor(mp, 1, zap.NewNop()) + pp.Start() + + pp.Process(msg) + time.Sleep(100 * time.Millisecond) + pp.Close() + + mp.AssertExpectations(t) +} diff --git a/cmd/ingester/app/processor/span_processor.go b/cmd/ingester/app/processor/span_processor.go new file mode 100644 index 00000000000..3b29bd8c5a6 --- /dev/null +++ b/cmd/ingester/app/processor/span_processor.go @@ -0,0 +1,32 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "io" +) + +//go:generate mockery -name=SpanProcessor + +// SpanProcessor processes kafka spans +type SpanProcessor interface { + Process(input Message) error + io.Closer +} + +// Message contains the fields of the kafka message that the span processor uses +type Message interface { + Value() []byte +} diff --git a/glide.lock b/glide.lock index 7fc6630fc16..277f8ea8769 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: b7aada76fe425b7f507a331dba1efe365bc166aca596f688610f185ea18c294a -updated: 2018-07-08T13:51:21.928395-04:00 +hash: 8f2b0df54e8351bffa668fe9fa265eb40b58efc8cdac540e0ff760b794e70ad3 +updated: 2018-07-18T21:52:13.448333-04:00 imports: - name: github.com/apache/thrift version: 53dd39833a08ce33582e5ff31fa18bb4735d6731 @@ -11,6 +11,8 @@ imports: version: 3a771d992973f24aa725d07868b467d1ddfceafb subpackages: - quantile +- name: github.com/bsm/sarama-cluster + version: cf455bc755fe41ac9bb2861e7a961833d9c2ecc3 - name: github.com/codahale/hdrhistogram version: 3a0bb77429bd3a61596f5e8a3172445844342120 - name: github.com/crossdock/crossdock-go @@ -79,7 +81,7 @@ imports: subpackages: - google/api - name: github.com/gogo/protobuf - version: 30cf7ac33676b5786e78c746683f0d4cd64fa75b + version: 636bf0302bc95575d69441b25a2603156ffdddf1 subpackages: - gogoproto - jsonpb @@ -289,7 +291,7 @@ imports: - zapcore - zaptest - name: golang.org/x/net - version: 32a936f46389aa10549d60bd7833e54b01685d09 + version: 8887df42c721e930089d31b28391090a10a497d7 subpackages: - context - context/ctxhttp diff --git a/glide.yaml b/glide.yaml index a88dd6b36c6..8bf0989ab18 100644 --- a/glide.yaml +++ b/glide.yaml @@ -54,10 +54,13 @@ import: - package: github.com/go-openapi/swag - package: github.com/go-openapi/validate - package: github.com/go-openapi/loads -- package: github.com/rakyll/statik/fs +- package: github.com/rakyll/statik + subpackages: + - fs - package: github.com/Shopify/sarama version: ^1.16.0 -# gogo/protobuf dependencies +- package: github.com/bsm/sarama-cluster + version: ^2.1.13 - package: github.com/gogo/googleapis version: master - package: github.com/gogo/protobuf @@ -70,3 +73,6 @@ import: version: master - package: github.com/gogo/gateway version: master +- package: golang.org/x/sys + subpackages: + - unix