diff --git a/CHANGELOG.md b/CHANGELOG.md index 10477ff8d..f133a6864 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +#### Unreleased + +Bug Fixes: + - Fix the producer's internal reference counting in certain unusual scenarios + ([#367](https://github.com/Shopify/sarama/pull/367)). + #### Version 1.0.0 (2015-03-17) Version 1.0.0 is the first tagged version, and is almost a complete rewrite. The primary differences with previous untagged versions are: diff --git a/async_producer.go b/async_producer.go index 8cbccadda..a7e5b5e78 100644 --- a/async_producer.go +++ b/async_producer.go @@ -54,7 +54,8 @@ type asyncProducer struct { errors chan *ProducerError input, successes, retries chan *ProducerMessage - brokers map[*Broker]*brokerProducer + brokers map[*Broker]chan *ProducerMessage + brokerRefs map[chan *ProducerMessage]int brokerLock sync.Mutex } @@ -82,13 +83,14 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) { } p := &asyncProducer{ - client: client, - conf: client.Config(), - errors: make(chan *ProducerError), - input: make(chan *ProducerMessage), - successes: make(chan *ProducerMessage), - retries: make(chan *ProducerMessage), - brokers: make(map[*Broker]*brokerProducer), + client: client, + conf: client.Config(), + errors: make(chan *ProducerError), + input: make(chan *ProducerMessage), + successes: make(chan *ProducerMessage), + retries: make(chan *ProducerMessage), + brokers: make(map[*Broker]chan *ProducerMessage), + brokerRefs: make(map[chan *ProducerMessage]int), } // launch our singleton dispatchers @@ -340,7 +342,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch retryState[msg.retries].expectChaser = true output <- &ProducerMessage{Topic: topic, Partition: partition, flags: chaser, retries: msg.retries - 1} Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition) - p.unrefBrokerProducer(leader) + p.unrefBrokerProducer(leader, output) output = nil time.Sleep(p.conf.Producer.Retry.Backoff) } else if highWatermark > 0 { @@ -406,7 +408,9 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch output <- msg } - p.unrefBrokerProducer(leader) + if output != nil { + p.unrefBrokerProducer(leader, output) + } p.retries <- &ProducerMessage{flags: unref} } @@ -529,9 +533,10 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) { continue default: Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err) - closing = err - _ = broker.Close() + p.abandonBrokerConnection(broker) p.retryMessages(batch, err) + _ = broker.Close() + closing = err continue } @@ -769,43 +774,43 @@ func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) { } } -type brokerProducer struct { - input chan *ProducerMessage - refs int -} - func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage { p.brokerLock.Lock() defer p.brokerLock.Unlock() - producer := p.brokers[broker] + bp := p.brokers[broker] - if producer == nil { + if bp == nil { p.retries <- &ProducerMessage{flags: ref} - producer = &brokerProducer{ - refs: 1, - input: make(chan *ProducerMessage), - } - p.brokers[broker] = producer - go withRecover(func() { p.messageAggregator(broker, producer.input) }) - } else { - producer.refs++ + bp = make(chan *ProducerMessage) + p.brokers[broker] = bp + p.brokerRefs[bp] = 0 + go withRecover(func() { p.messageAggregator(broker, bp) }) } - return producer.input + p.brokerRefs[bp]++ + + return bp } -func (p *asyncProducer) unrefBrokerProducer(broker *Broker) { +func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan *ProducerMessage) { p.brokerLock.Lock() defer p.brokerLock.Unlock() - producer := p.brokers[broker] + p.brokerRefs[bp]-- + if p.brokerRefs[bp] == 0 { + close(bp) + delete(p.brokerRefs, bp) - if producer != nil { - producer.refs-- - if producer.refs == 0 { - close(producer.input) + if p.brokers[broker] == bp { delete(p.brokers, broker) } } } + +func (p *asyncProducer) abandonBrokerConnection(broker *Broker) { + p.brokerLock.Lock() + defer p.brokerLock.Unlock() + + delete(p.brokers, broker) +} diff --git a/async_producer_test.go b/async_producer_test.go index ff1019fc8..04ee9576e 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -492,6 +492,75 @@ func TestAsyncProducerOutOfRetries(t *testing.T) { safeClose(t, producer) } +func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { + seedBroker := newMockBroker(t, 1) + leader := newMockBroker(t, 2) + leaderAddr := leader.Addr() + + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(leaderAddr, leader.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) + metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError) + seedBroker.Returns(metadataResponse) + + config := NewConfig() + config.Producer.Return.Successes = true + config.Producer.Retry.Backoff = 0 + config.Producer.Retry.Max = 1 + config.Producer.Partitioner = NewRoundRobinPartitioner + producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + // prime partition 0 + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + leader.Returns(prodSuccess) + select { + case msg := <-producer.Errors(): + t.Error(msg.Err) + case <-producer.Successes(): + } + + // prime partition 1 + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + prodSuccess = new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError) + leader.Returns(prodSuccess) + select { + case msg := <-producer.Errors(): + t.Error(msg.Err) + case <-producer.Successes(): + } + + // reboot the broker (the producer will get EOF on its existing connection) + leader.Close() + leader = newMockBrokerAddr(t, 2, leaderAddr) + + // send another message on partition 0 to trigger the EOF and retry + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + + // tell partition 0 to go to that broker again + seedBroker.Returns(metadataResponse) + + // succeed this time + prodSuccess = new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + leader.Returns(prodSuccess) + select { + case msg := <-producer.Errors(): + t.Error(msg.Err) + case <-producer.Successes(): + } + + // shutdown + closeProducer(t, producer) + seedBroker.Close() + leader.Close() +} + // This example shows how to use the producer while simultaneously // reading the Errors channel to know about any failures. func ExampleAsyncProducer_select() {