Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade github.com/Shopify/sarama v1.33.0 to github.com/IBM/sarama v1.40.0 #4294

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

// CreateConsumer creates a new span consumer for the ingester
func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWriter spanstore.Writer, options app.Options) (*consumer.Consumer, error) {
func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWriter spanstore.Writer, options app.Options, consumerOptions ...consumer.Option) (*consumer.Consumer, error) {
var unmarshaller kafka.Unmarshaller
switch options.Encoding {
case kafka.EncodingJSON:
Expand Down Expand Up @@ -65,11 +65,13 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit

factoryParams := consumer.ProcessorFactoryParams{
Parallelism: options.Parallelism,
Topic: options.Topic,
SaramaConsumer: saramaConsumer,
BaseProcessor: spanProcessor,
Logger: logger,
Factory: metricsFactory,
}

processorFactory, err := consumer.NewProcessorFactory(factoryParams)
if err != nil {
return nil, err
Expand All @@ -82,5 +84,5 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
Logger: logger,
DeadlockCheckInterval: options.DeadlockInterval,
}
return consumer.New(consumerParams)
return consumer.New(consumerParams, consumerOptions...)
}
279 changes: 208 additions & 71 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,34 @@
package consumer

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/Shopify/sarama"
sc "github.com/bsm/sarama-cluster"
"github.com/IBM/sarama"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
"github.com/jaegertracing/jaeger/pkg/kafka/consumer"
"github.com/jaegertracing/jaeger/pkg/metrics"
)

type Option func(c *Consumer)

func WithGlobalDeadlockDetectorEnabled(enabled bool) Option {
return func(c *Consumer) {
c.globalDeadlockDetectorEnabled = enabled
}
}

func WithWaitReady(waitReady bool) Option {
return func(c *Consumer) {
c.waitReady = waitReady
}
}

// Params are the parameters of a Consumer
type Params struct {
ProcessorFactory ProcessorFactory
Expand All @@ -44,52 +60,149 @@ type Consumer struct {
internalConsumer consumer.Consumer
processorFactory ProcessorFactory

deadlockDetector deadlockDetector
deadlockDetector deadlockDetector
deadlockNotifyCh chan struct{}
deadlockNotifyOnce sync.Once
globalDeadlockDetectorEnabled bool // defailt is enabled

partitionIDToState map[int32]*consumerState
partitionMapLock sync.Mutex
partitionsHeld int64
partitionsHeld atomic.Int64
partitionsHeldGauge metrics.Gauge

waitReady bool // defailt is not wait
consumerReady *consumerReady

doneWg sync.WaitGroup

topic string
cancel context.CancelFunc
}

type consumerState struct {
partitionConsumer sc.PartitionConsumer
type consumerReady struct {
readyCh chan struct{}
closeOnce sync.Once
}

func (c *consumerReady) waitReady() {
<-c.readyCh
}

func (c *consumerReady) markReady() {
c.close()
}

func (c *consumerReady) close() {
c.closeOnce.Do(func() {
close(c.readyCh)
})
}

// New is a constructor for a Consumer
func New(params Params) (*Consumer, error) {
func New(params Params, opts ...Option) (*Consumer, error) {
deadlockDetector := newDeadlockDetector(params.MetricsFactory, params.Logger, params.DeadlockCheckInterval)
return &Consumer{
metricsFactory: params.MetricsFactory,
logger: params.Logger,
internalConsumer: params.InternalConsumer,
processorFactory: params.ProcessorFactory,
deadlockDetector: deadlockDetector,
partitionIDToState: make(map[int32]*consumerState),
partitionsHeldGauge: partitionsHeldGauge(params.MetricsFactory),
}, nil
c := &Consumer{
metricsFactory: params.MetricsFactory,
logger: params.Logger,
internalConsumer: params.InternalConsumer,
processorFactory: params.ProcessorFactory,
deadlockDetector: deadlockDetector,
deadlockNotifyCh: make(chan struct{}, 1),
globalDeadlockDetectorEnabled: true,
partitionsHeldGauge: partitionsHeldGauge(params.MetricsFactory),
consumerReady: &consumerReady{
readyCh: make(chan struct{}, 1),
},
waitReady: false,
topic: params.ProcessorFactory.topic,
}

for _, opt := range opts {
opt(c)
}

return c, nil
}

// Start begins consuming messages in a go routine
func (c *Consumer) Start() {
c.deadlockDetector.start()
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel

// handle errors
c.doneWg.Add(1)
go func() {
defer c.doneWg.Done()
c.logger.Info("Starting main loop")
for pc := range c.internalConsumer.Partitions() {
c.partitionMapLock.Lock()
c.partitionIDToState[pc.Partition()] = &consumerState{partitionConsumer: pc}
c.partitionMapLock.Unlock()
c.partitionMetrics(pc.Partition()).startCounter.Inc(1)

c.doneWg.Add(2)
go c.handleMessages(pc)
go c.handleErrors(pc.Partition(), pc.Errors())
}
c.handleErrors(ctx)
}()

// start consume
c.doneWg.Add(1)
go func() {
defer c.doneWg.Done()
c.startConsume(ctx)
}()

// Start begins consuming and wait is running and disable global deadlock detector,
// There are two deadlock detectors, one global and one partition-specific
// If the consumer is running, the partition-specific deadlock detector takes effect
if c.globalDeadlockDetectorEnabled {
// all partition deadlock detector
c.deadlockDetector.start()
}

if c.waitReady {
c.consumerReady.waitReady()
}
}

func (c *Consumer) startConsume(ctx context.Context) {
topic := []string{c.topic}
// reference consumerReady and make sure to close it, because later consumerReady may reference a new object
ready := c.consumerReady
defer ready.close()

for {
select {
case <-ctx.Done():
c.logger.Error("Context canceled")
return
default:
c.logger.Info("Topic", zap.Strings("topic", topic))
if err := c.internalConsumer.Consume(ctx, topic, c); err != nil {
c.logger.Error("Error from consumer", zap.Error(err))
}

c.consumerReady.close()

cr := &consumerReady{
readyCh: make(chan struct{}, 1),
}
c.consumerReady = cr

// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
if c.cancel != nil {
c.cancel()
}
}
}
}
}

// markDeadlock mark consume is deadlocked
func (c *Consumer) markDeadlock() {
c.closeDeadlock()
}

// closeDeadlock close deadlock chan
func (c *Consumer) closeDeadlock() {
c.deadlockNotifyOnce.Do(func() {
close(c.deadlockNotifyCh)
})
}

// Deadlock return a consume deadlock notify chan
func (c *Consumer) Deadlock() <-chan struct{} {
return c.deadlockNotifyCh
}

// Close closes the Consumer and underlying sarama consumer
Expand All @@ -98,53 +211,68 @@ func (c *Consumer) Close() error {
c.logger.Info("Closing parent consumer")
err := c.internalConsumer.Close()

c.logger.Debug("Closing deadlock detector")
c.deadlockDetector.close()
c.closeDeadlock()

if c.cancel != nil {
c.cancel()
}

c.logger.Debug("Waiting for messages and errors to be handled")
c.doneWg.Wait()

return err
}

// handleMessages handles incoming Kafka messages on a channel
func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition()))
c.partitionMapLock.Lock()
c.partitionsHeld++
c.partitionsHeldGauge.Update(c.partitionsHeld)
c.partitionMapLock.Unlock()
defer func() {
c.closePartition(pc)
c.partitionMapLock.Lock()
c.partitionsHeld--
c.partitionsHeldGauge.Update(c.partitionsHeld)
c.partitionMapLock.Unlock()
c.doneWg.Done()
}()
// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
c.consumerReady.markReady()
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
c.partitionMetrics(claim.Partition()).startCounter.Inc(1)
return c.handleMessages(session, claim)
}

// handleMessages starting message handler
func (c *Consumer) handleMessages(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
c.logger.Info("Starting message handler", zap.Int32("partition", claim.Partition()))

msgMetrics := c.newMsgMetrics(pc.Partition())
c.partitionsHeld.Add(1)
c.partitionsHeldGauge.Update(c.partitionsHeld.Load())

var msgProcessor processor.SpanProcessor
msgMetrics := c.newMsgMetrics(claim.Partition())
var msgProcessor processor.SpanProcessor = nil

deadlockDetector := c.deadlockDetector.startMonitoringForPartition(pc.Partition())
defer deadlockDetector.close()
partitionDeadlockDetector := c.deadlockDetector.startMonitoringForPartition(claim.Partition())

defer func() {
partitionDeadlockDetector.close()
c.closePartition(claim)
c.partitionsHeld.Add(-1)
c.partitionsHeldGauge.Update(c.partitionsHeld.Load())
}()

for {
select {
case msg, ok := <-pc.Messages():
if !ok {
c.logger.Info("Message channel closed. ", zap.Int32("partition", pc.Partition()))
return
}
case msg := <-claim.Messages():

c.logger.Debug("Got msg", zap.Any("msg", msg))
msgMetrics.counter.Inc(1)
msgMetrics.offsetGauge.Update(msg.Offset)
msgMetrics.lagGauge.Update(pc.HighWaterMarkOffset() - msg.Offset - 1)
deadlockDetector.incrementMsgCount()
msgMetrics.lagGauge.Update(claim.HighWaterMarkOffset() - msg.Offset - 1)
partitionDeadlockDetector.incrementMsgCount()

if msgProcessor == nil {
msgProcessor = c.processorFactory.new(pc.Topic(), pc.Partition(), msg.Offset-1)
msgProcessor = c.processorFactory.new(session, claim, msg.Offset-1)
defer msgProcessor.Close()
}

Expand All @@ -153,29 +281,38 @@ func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Error("Failed to process a Kafka message", zap.Error(err), zap.Int32("partition", msg.Partition), zap.Int64("offset", msg.Offset))
}

case <-deadlockDetector.closePartitionChannel():
c.logger.Info("Closing partition due to inactivity", zap.Int32("partition", pc.Partition()))
return
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/IBM/sarama/issues/1192
case <-session.Context().Done():
c.logger.Info("Session done", zap.Int32("partition", claim.Partition()))
return session.Context().Err()
case <-partitionDeadlockDetector.closePartitionChannel():
c.logger.Info("Closing partition due to inactivity", zap.Int32("partition", claim.Partition()))

// Note: here use async to first return 'error' to sarama error handling,
// and then wake up deadlock detection, which helps test coverage of `handleErrors`
go c.markDeadlock()

return fmt.Errorf("closing partition[%d] due to inactivity", claim.Partition())
}
}
}

func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
c.logger.Info("Closing partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
partitionConsumer.Close() // blocks until messages channel is drained
c.partitionMetrics(partitionConsumer.Partition()).closeCounter.Inc(1)
c.logger.Info("Closed partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
// closePartition close partition of consumer
func (c *Consumer) closePartition(claim sarama.ConsumerGroupClaim) {
c.logger.Info("Closing partition consumer", zap.Int32("partition", claim.Partition()))
c.partitionMetrics(claim.Partition()).closeCounter.Inc(1)
}

// handleErrors handles incoming Kafka consumer errors on a channel
func (c *Consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) {
c.logger.Info("Starting error handler", zap.Int32("partition", partition))
defer c.doneWg.Done()
func (c *Consumer) handleErrors(ctx context.Context) {
c.logger.Info("Starting error handler")

errMetrics := c.newErrMetrics(partition)
errChan := c.internalConsumer.Errors()
errMetrics := c.newErrMetrics()
for err := range errChan {
errMetrics.errCounter.Inc(1)
c.logger.Error("Error consuming from Kafka", zap.Error(err))
}
c.logger.Info("Finished handling errors", zap.Int32("partition", partition))
}
Loading