Skip to content

Commit

Permalink
Improve consumer ready status
Browse files Browse the repository at this point in the history
Signed-off-by: axfor <aixiaoxiang2009@hotmail.com>
  • Loading branch information
axfor committed Apr 7, 2023
1 parent f554089 commit 380ec42
Showing 1 changed file with 25 additions and 16 deletions.
41 changes: 25 additions & 16 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,27 @@ type Consumer struct {

doneWg sync.WaitGroup

ready chan struct{}
readyClose bool
readyCloseLock sync.Mutex
consumerReady *ConsumerReady

topic string
cancel context.CancelFunc
}

type ConsumerReady struct {
ready chan struct{}
closeOnce sync.Once
}

func (c *ConsumerReady) Ready() {
<-c.ready
}

func (c *ConsumerReady) Close() {
c.closeOnce.Do(func() {
close(c.ready)
})
}

// New is a constructor for a Consumer
func New(params Params) (*Consumer, error) {
deadlockDetector := newDeadlockDetector(params.MetricsFactory, params.Logger, params.DeadlockCheckInterval)
Expand All @@ -71,8 +84,10 @@ func New(params Params) (*Consumer, error) {
processorFactory: params.ProcessorFactory,
deadlockDetector: deadlockDetector,
partitionsHeldGauge: partitionsHeldGauge(params.MetricsFactory),
ready: make(chan struct{}),
topic: params.ProcessorFactory.topic,
consumerReady: &ConsumerReady{
ready: make(chan struct{}),
},
topic: params.ProcessorFactory.topic,
}, nil
}

Expand All @@ -99,7 +114,9 @@ func (c *Consumer) Start() {
if ctx.Err() != nil {
return
}
c.ready = make(chan struct{})
c.consumerReady = &ConsumerReady{
ready: make(chan struct{}),
}
}
}
}()
Expand All @@ -125,21 +142,13 @@ func (c *Consumer) Close() error {

// Ready is consumer running
func (c *Consumer) Ready() {
<-c.ready
c.consumerReady.Ready()
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready

c.readyCloseLock.Lock()
defer c.readyCloseLock.Unlock()

if !c.readyClose {
c.readyClose = true
close(c.ready)
}

c.consumerReady.Close()
return nil
}

Expand Down

0 comments on commit 380ec42

Please sign in to comment.