Skip to content

Commit

Permalink
Bump github.com/Shopify/sarama v1.33.0 to github.com/IBM/sarama v1.40.0
Browse files Browse the repository at this point in the history
Signed-off-by: axfor <aixiaoxiang2009@hotmail.com>
  • Loading branch information
axfor committed Jul 22, 2023
1 parent 2c1bf07 commit c3dd2cb
Show file tree
Hide file tree
Showing 28 changed files with 637 additions and 415 deletions.
4 changes: 2 additions & 2 deletions cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

// CreateConsumer creates a new span consumer for the ingester
func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWriter spanstore.Writer, options app.Options) (*consumer.Consumer, error) {
func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWriter spanstore.Writer, options app.Options, consumerOptions ...consumer.Option) (*consumer.Consumer, error) {
var unmarshaller kafka.Unmarshaller
switch options.Encoding {
case kafka.EncodingJSON:
Expand Down Expand Up @@ -82,5 +82,5 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
Logger: logger,
DeadlockCheckInterval: options.DeadlockInterval,
}
return consumer.New(consumerParams)
return consumer.New(consumerParams, consumerOptions...)
}
279 changes: 208 additions & 71 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,34 @@
package consumer

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/Shopify/sarama"
sc "github.com/bsm/sarama-cluster"
"github.com/IBM/sarama"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
"github.com/jaegertracing/jaeger/pkg/kafka/consumer"
"github.com/jaegertracing/jaeger/pkg/metrics"
)

type Option func(c *Consumer)

func WithGlobalDeadlockDetectorEnabled(enabled bool) Option {
return func(c *Consumer) {
c.globalDeadlockDetectorEnabled = enabled
}
}

func WithWaitReady(waitReady bool) Option {
return func(c *Consumer) {
c.waitReady = waitReady
}
}

// Params are the parameters of a Consumer
type Params struct {
ProcessorFactory ProcessorFactory
Expand All @@ -44,52 +60,149 @@ type Consumer struct {
internalConsumer consumer.Consumer
processorFactory ProcessorFactory

deadlockDetector deadlockDetector
deadlockDetector deadlockDetector
deadlockNotifyCh chan struct{}
deadlockNotifyOnce sync.Once
globalDeadlockDetectorEnabled bool // defailt is enabled

partitionIDToState map[int32]*consumerState
partitionMapLock sync.Mutex
partitionsHeld int64
partitionsHeld atomic.Int64
partitionsHeldGauge metrics.Gauge

waitReady bool // defailt is not wait
consumerReady *consumerReady

doneWg sync.WaitGroup

topic string
cancel context.CancelFunc
}

type consumerState struct {
partitionConsumer sc.PartitionConsumer
type consumerReady struct {
readyCh chan struct{}
closeOnce sync.Once
}

func (c *consumerReady) waitReady() {
<-c.readyCh
}

func (c *consumerReady) markReady() {
c.close()
}

func (c *consumerReady) close() {
c.closeOnce.Do(func() {
close(c.readyCh)
})
}

// New is a constructor for a Consumer
func New(params Params) (*Consumer, error) {
func New(params Params, opts ...Option) (*Consumer, error) {
deadlockDetector := newDeadlockDetector(params.MetricsFactory, params.Logger, params.DeadlockCheckInterval)
return &Consumer{
metricsFactory: params.MetricsFactory,
logger: params.Logger,
internalConsumer: params.InternalConsumer,
processorFactory: params.ProcessorFactory,
deadlockDetector: deadlockDetector,
partitionIDToState: make(map[int32]*consumerState),
partitionsHeldGauge: partitionsHeldGauge(params.MetricsFactory),
}, nil
c := &Consumer{
metricsFactory: params.MetricsFactory,
logger: params.Logger,
internalConsumer: params.InternalConsumer,
processorFactory: params.ProcessorFactory,
deadlockDetector: deadlockDetector,
deadlockNotifyCh: make(chan struct{}, 1),
globalDeadlockDetectorEnabled: true,
partitionsHeldGauge: partitionsHeldGauge(params.MetricsFactory),
consumerReady: &consumerReady{
readyCh: make(chan struct{}, 1),
},
waitReady: false,
topic: params.ProcessorFactory.topic,
}

for _, opt := range opts {
opt(c)
}

return c, nil
}

// Start begins consuming messages in a go routine
func (c *Consumer) Start() {
c.deadlockDetector.start()
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel

// handle errors
c.doneWg.Add(1)
go func() {
defer c.doneWg.Done()
c.logger.Info("Starting main loop")
for pc := range c.internalConsumer.Partitions() {
c.partitionMapLock.Lock()
c.partitionIDToState[pc.Partition()] = &consumerState{partitionConsumer: pc}
c.partitionMapLock.Unlock()
c.partitionMetrics(pc.Partition()).startCounter.Inc(1)

c.doneWg.Add(2)
go c.handleMessages(pc)
go c.handleErrors(pc.Partition(), pc.Errors())
}
c.handleErrors(ctx)
}()

// start consume
c.doneWg.Add(1)
go func() {
defer c.doneWg.Done()
c.startConsume(ctx)
}()

// Start begins consuming and wait is running and disable global deadlock detector,
// There are two deadlock detectors, one global and one partition-specific
// If the consumer is running, the partition-specific deadlock detector takes effect
if c.globalDeadlockDetectorEnabled {
// all partition deadlock detector
c.deadlockDetector.start()
}

if c.waitReady {
c.consumerReady.waitReady()
}
}

func (c *Consumer) startConsume(ctx context.Context) {
topic := []string{c.topic}
// reference consumerReady and make sure to close it, because later consumerReady may reference a new object
ready := c.consumerReady
defer ready.close()

for {
select {
case <-ctx.Done():
c.logger.Error("Context canceled")
return
default:
c.logger.Info("Topic", zap.Strings("topic", topic))
if err := c.internalConsumer.Consume(ctx, topic, c); err != nil {
c.logger.Error("Error from consumer", zap.Error(err))
}

c.consumerReady.close()

cr := &consumerReady{
readyCh: make(chan struct{}, 1),
}
c.consumerReady = cr

// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
if c.cancel != nil {
c.cancel()
}
}
}
}
}

// markDeadlock mark consume is deadlocked
func (c *Consumer) markDeadlock() {
c.closeDeadlock()
}

// closeDeadlock close deadlock chan
func (c *Consumer) closeDeadlock() {
c.deadlockNotifyOnce.Do(func() {
close(c.deadlockNotifyCh)
})
}

// Deadlock return a consume deadlock notify chan
func (c *Consumer) Deadlock() <-chan struct{} {
return c.deadlockNotifyCh
}

// Close closes the Consumer and underlying sarama consumer
Expand All @@ -98,53 +211,68 @@ func (c *Consumer) Close() error {
c.logger.Info("Closing parent consumer")
err := c.internalConsumer.Close()

c.logger.Debug("Closing deadlock detector")
c.deadlockDetector.close()
c.closeDeadlock()

if c.cancel != nil {
c.cancel()
}

c.logger.Debug("Waiting for messages and errors to be handled")
c.doneWg.Wait()

return err
}

// handleMessages handles incoming Kafka messages on a channel
func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition()))
c.partitionMapLock.Lock()
c.partitionsHeld++
c.partitionsHeldGauge.Update(c.partitionsHeld)
c.partitionMapLock.Unlock()
defer func() {
c.closePartition(pc)
c.partitionMapLock.Lock()
c.partitionsHeld--
c.partitionsHeldGauge.Update(c.partitionsHeld)
c.partitionMapLock.Unlock()
c.doneWg.Done()
}()
// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
c.consumerReady.markReady()
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
c.partitionMetrics(claim.Partition()).startCounter.Inc(1)
return c.handleMessages(session, claim)
}

// handleMessages starting message handler
func (c *Consumer) handleMessages(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
c.logger.Info("Starting message handler", zap.Int32("partition", claim.Partition()))

msgMetrics := c.newMsgMetrics(pc.Partition())
c.partitionsHeld.Add(1)
c.partitionsHeldGauge.Update(c.partitionsHeld.Load())

var msgProcessor processor.SpanProcessor
msgMetrics := c.newMsgMetrics(claim.Partition())
var msgProcessor processor.SpanProcessor = nil

deadlockDetector := c.deadlockDetector.startMonitoringForPartition(pc.Partition())
defer deadlockDetector.close()
partitionDeadlockDetector := c.deadlockDetector.startMonitoringForPartition(claim.Partition())

defer func() {
partitionDeadlockDetector.close()
c.closePartition(claim)
c.partitionsHeld.Add(-1)
c.partitionsHeldGauge.Update(c.partitionsHeld.Load())
}()

for {
select {
case msg, ok := <-pc.Messages():
if !ok {
c.logger.Info("Message channel closed. ", zap.Int32("partition", pc.Partition()))
return
}
case msg := <-claim.Messages():

c.logger.Debug("Got msg", zap.Any("msg", msg))
msgMetrics.counter.Inc(1)
msgMetrics.offsetGauge.Update(msg.Offset)
msgMetrics.lagGauge.Update(pc.HighWaterMarkOffset() - msg.Offset - 1)
deadlockDetector.incrementMsgCount()
msgMetrics.lagGauge.Update(claim.HighWaterMarkOffset() - msg.Offset - 1)
partitionDeadlockDetector.incrementMsgCount()

if msgProcessor == nil {
msgProcessor = c.processorFactory.new(pc.Topic(), pc.Partition(), msg.Offset-1)
msgProcessor = c.processorFactory.new(session, claim, msg.Offset-1)
defer msgProcessor.Close()
}

Expand All @@ -153,29 +281,38 @@ func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Error("Failed to process a Kafka message", zap.Error(err), zap.Int32("partition", msg.Partition), zap.Int64("offset", msg.Offset))
}

case <-deadlockDetector.closePartitionChannel():
c.logger.Info("Closing partition due to inactivity", zap.Int32("partition", pc.Partition()))
return
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/IBM/sarama/issues/1192
case <-session.Context().Done():
c.logger.Info("Session done", zap.Int32("partition", claim.Partition()))
return session.Context().Err()
case <-partitionDeadlockDetector.closePartitionChannel():
c.logger.Info("Closing partition due to inactivity", zap.Int32("partition", claim.Partition()))

// Note: here use async to first return 'error' to sarama error handling,
// and then wake up deadlock detection, which helps test coverage of `handleErrors`
go c.markDeadlock()

return fmt.Errorf("closing partition[%d] due to inactivity", claim.Partition())
}
}
}

func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
c.logger.Info("Closing partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
partitionConsumer.Close() // blocks until messages channel is drained
c.partitionMetrics(partitionConsumer.Partition()).closeCounter.Inc(1)
c.logger.Info("Closed partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
// closePartition close partition of consumer
func (c *Consumer) closePartition(claim sarama.ConsumerGroupClaim) {
c.logger.Info("Closing partition consumer", zap.Int32("partition", claim.Partition()))
c.partitionMetrics(claim.Partition()).closeCounter.Inc(1)
}

// handleErrors handles incoming Kafka consumer errors on a channel
func (c *Consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) {
c.logger.Info("Starting error handler", zap.Int32("partition", partition))
defer c.doneWg.Done()
func (c *Consumer) handleErrors(ctx context.Context) {
c.logger.Info("Starting error handler")

errMetrics := c.newErrMetrics(partition)
errChan := c.internalConsumer.Errors()
errMetrics := c.newErrMetrics()
for err := range errChan {
errMetrics.errCounter.Inc(1)
c.logger.Error("Error consuming from Kafka", zap.Error(err))
}
c.logger.Info("Finished handling errors", zap.Int32("partition", partition))
}
Loading

0 comments on commit c3dd2cb

Please sign in to comment.