Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

producer: bugfix for broker flushers getting stuck #367

Merged
merged 1 commit into from
Mar 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

#### Unreleased

Bug Fixes:
- Fix the producer's internal reference counting in certain unusual scenarios
([#367](https://github.com/Shopify/sarama/pull/367)).

#### Version 1.0.0 (2015-03-17)

Version 1.0.0 is the first tagged version, and is almost a complete rewrite. The primary differences with previous untagged versions are:
Expand Down
73 changes: 39 additions & 34 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type asyncProducer struct {
errors chan *ProducerError
input, successes, retries chan *ProducerMessage

brokers map[*Broker]*brokerProducer
brokers map[*Broker]chan *ProducerMessage
brokerRefs map[chan *ProducerMessage]int
brokerLock sync.Mutex
}

Expand Down Expand Up @@ -82,13 +83,14 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
}

p := &asyncProducer{
client: client,
conf: client.Config(),
errors: make(chan *ProducerError),
input: make(chan *ProducerMessage),
successes: make(chan *ProducerMessage),
retries: make(chan *ProducerMessage),
brokers: make(map[*Broker]*brokerProducer),
client: client,
conf: client.Config(),
errors: make(chan *ProducerError),
input: make(chan *ProducerMessage),
successes: make(chan *ProducerMessage),
retries: make(chan *ProducerMessage),
brokers: make(map[*Broker]chan *ProducerMessage),
brokerRefs: make(map[chan *ProducerMessage]int),
}

// launch our singleton dispatchers
Expand Down Expand Up @@ -340,7 +342,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
retryState[msg.retries].expectChaser = true
output <- &ProducerMessage{Topic: topic, Partition: partition, flags: chaser, retries: msg.retries - 1}
Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
p.unrefBrokerProducer(leader)
p.unrefBrokerProducer(leader, output)
output = nil
time.Sleep(p.conf.Producer.Retry.Backoff)
} else if highWatermark > 0 {
Expand Down Expand Up @@ -406,7 +408,9 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
output <- msg
}

p.unrefBrokerProducer(leader)
if output != nil {
p.unrefBrokerProducer(leader, output)
}
p.retries <- &ProducerMessage{flags: unref}
}

Expand Down Expand Up @@ -529,9 +533,10 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
continue
default:
Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err)
closing = err
_ = broker.Close()
p.abandonBrokerConnection(broker)
p.retryMessages(batch, err)
_ = broker.Close()
closing = err
continue
}

Expand Down Expand Up @@ -769,43 +774,43 @@ func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
}
}

type brokerProducer struct {
input chan *ProducerMessage
refs int
}

func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()

producer := p.brokers[broker]
bp := p.brokers[broker]

if producer == nil {
if bp == nil {
p.retries <- &ProducerMessage{flags: ref}
producer = &brokerProducer{
refs: 1,
input: make(chan *ProducerMessage),
}
p.brokers[broker] = producer
go withRecover(func() { p.messageAggregator(broker, producer.input) })
} else {
producer.refs++
bp = make(chan *ProducerMessage)
p.brokers[broker] = bp
p.brokerRefs[bp] = 0
go withRecover(func() { p.messageAggregator(broker, bp) })
}

return producer.input
p.brokerRefs[bp]++

return bp
}

func (p *asyncProducer) unrefBrokerProducer(broker *Broker) {
func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan *ProducerMessage) {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()

producer := p.brokers[broker]
p.brokerRefs[bp]--
if p.brokerRefs[bp] == 0 {
close(bp)
delete(p.brokerRefs, bp)

if producer != nil {
producer.refs--
if producer.refs == 0 {
close(producer.input)
if p.brokers[broker] == bp {
delete(p.brokers, broker)
}
}
}

func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()

delete(p.brokers, broker)
}
69 changes: 69 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,75 @@ func TestAsyncProducerOutOfRetries(t *testing.T) {
safeClose(t, producer)
}

func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
leaderAddr := leader.Addr()

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

config := NewConfig()
config.Producer.Return.Successes = true
config.Producer.Retry.Backoff = 0
config.Producer.Retry.Max = 1
config.Producer.Partitioner = NewRoundRobinPartitioner
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

// prime partition 0
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case <-producer.Successes():
}

// prime partition 1
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
prodSuccess = new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
leader.Returns(prodSuccess)
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case <-producer.Successes():
}

// reboot the broker (the producer will get EOF on its existing connection)
leader.Close()
leader = newMockBrokerAddr(t, 2, leaderAddr)

// send another message on partition 0 to trigger the EOF and retry
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}

// tell partition 0 to go to that broker again
seedBroker.Returns(metadataResponse)

// succeed this time
prodSuccess = new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case <-producer.Successes():
}

// shutdown
closeProducer(t, producer)
seedBroker.Close()
leader.Close()
}

// This example shows how to use the producer while simultaneously
// reading the Errors channel to know about any failures.
func ExampleAsyncProducer_select() {
Expand Down