diff --git a/async_producer.go b/async_producer.go index 50cb6a6c1..3fbe4265b 100644 --- a/async_producer.go +++ b/async_producer.go @@ -137,12 +137,12 @@ type ProducerMessage struct { retries int flags flagSet - - keyCache, valueCache []byte } +const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. + func (m *ProducerMessage) byteSize() int { - size := 26 // the metadata overhead of CRC, flags, etc. + size := producerMessageOverhead if m.Key != nil { size += m.Key.Length() } @@ -155,8 +155,6 @@ func (m *ProducerMessage) byteSize() int { func (m *ProducerMessage) clear() { m.flags = 0 m.retries = 0 - m.keyCache = nil - m.valueCache = nil } // ProducerError is the type of error generated when the producer fails to deliver a message. @@ -645,50 +643,54 @@ func (f *flusher) run() { continue } - msgSets := f.groupAndFilter(batch) - request := f.parent.buildRequest(msgSets) - if request == nil { + set := f.groupAndFilter(batch) + if set.empty() { continue } + request := set.buildRequest() response, err := f.broker.Produce(request) switch err.(type) { case nil: break case PacketEncodingError: - f.parent.returnErrors(batch, err) + set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + f.parent.returnErrors(msgs, err) + }) continue default: Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err) f.parent.abandonBrokerConnection(f.broker) _ = f.broker.Close() closing = err - f.parent.retryMessages(batch, err) + set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + f.parent.retryMessages(msgs, err) + }) continue } if response == nil { // this only happens when RequiredAcks is NoResponse, so we have to assume success - f.parent.returnSuccesses(batch) + set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + f.parent.returnSuccesses(msgs) + }) continue } - f.parseResponse(msgSets, response) + f.parseResponse(set, response) } Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID()) } -func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage { - var err error - msgSets := make(map[string]map[int32][]*ProducerMessage) +func (f *flusher) groupAndFilter(batch []*ProducerMessage) *produceSet { + set := newProduceSet(f.parent) - for i, msg := range batch { + for _, msg := range batch { if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil { // we're currently retrying this partition so we need to filter out this message f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition]) - batch[i] = nil if msg.flags&chaser == chaser { // ...but now we can start processing future messages again @@ -700,68 +702,47 @@ func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32] continue } - if msg.Key != nil { - if msg.keyCache, err = msg.Key.Encode(); err != nil { - f.parent.returnError(msg, err) - batch[i] = nil - continue - } - } - - if msg.Value != nil { - if msg.valueCache, err = msg.Value.Encode(); err != nil { - f.parent.returnError(msg, err) - batch[i] = nil - continue - } - } - - partitionSet := msgSets[msg.Topic] - if partitionSet == nil { - partitionSet = make(map[int32][]*ProducerMessage) - msgSets[msg.Topic] = partitionSet + if err := set.add(msg); err != nil { + f.parent.returnError(msg, err) + continue } - - partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg) } - return msgSets + return set } -func (f *flusher) parseResponse(msgSets map[string]map[int32][]*ProducerMessage, response *ProduceResponse) { +func (f *flusher) parseResponse(set *produceSet, response *ProduceResponse) { // we iterate through the blocks in the request set, not the response, so that we notice // if the response is missing a block completely - for topic, partitionSet := range msgSets { - for partition, msgs := range partitionSet { - block := response.GetBlock(topic, partition) - if block == nil { - f.parent.returnErrors(msgs, ErrIncompleteResponse) - continue - } + set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + block := response.GetBlock(topic, partition) + if block == nil { + f.parent.returnErrors(msgs, ErrIncompleteResponse) + return + } - switch block.Err { - // Success - case ErrNoError: - for i := range msgs { - msgs[i].Offset = block.Offset + int64(i) - } - f.parent.returnSuccesses(msgs) - // Retriable errors - case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, - ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: - Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n", - f.broker.ID(), topic, partition, block.Err) - if f.currentRetries[topic] == nil { - f.currentRetries[topic] = make(map[int32]error) - } - f.currentRetries[topic][partition] = block.Err - f.parent.retryMessages(msgs, block.Err) - // Other non-retriable errors - default: - f.parent.returnErrors(msgs, block.Err) + switch block.Err { + // Success + case ErrNoError: + for i := range msgs { + msgs[i].Offset = block.Offset + int64(i) + } + f.parent.returnSuccesses(msgs) + // Retriable errors + case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, + ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: + Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n", + f.broker.ID(), topic, partition, block.Err) + if f.currentRetries[topic] == nil { + f.currentRetries[topic] = make(map[int32]error) } + f.currentRetries[topic][partition] = block.Err + f.parent.retryMessages(msgs, block.Err) + // Other non-retriable errors + default: + f.parent.returnErrors(msgs, block.Err) } - } + }) } // singleton @@ -791,6 +772,112 @@ func (p *asyncProducer) retryHandler() { } } +// produceSet + +type partitionSet struct { + msgs []*ProducerMessage + setToSend *MessageSet + bufferBytes int +} + +type produceSet struct { + parent *asyncProducer + msgs map[string]map[int32]*partitionSet + + bufferBytes int + bufferCount int +} + +func newProduceSet(parent *asyncProducer) *produceSet { + return &produceSet{ + msgs: make(map[string]map[int32]*partitionSet), + parent: parent, + } +} + +func (ps *produceSet) add(msg *ProducerMessage) error { + var err error + var key, val []byte + + if msg.Key != nil { + if key, err = msg.Key.Encode(); err != nil { + return err + } + } + + if msg.Value != nil { + if val, err = msg.Value.Encode(); err != nil { + return err + } + } + + partitions := ps.msgs[msg.Topic] + if partitions == nil { + partitions = make(map[int32]*partitionSet) + ps.msgs[msg.Topic] = partitions + } + + set := partitions[msg.Partition] + if set == nil { + set = &partitionSet{setToSend: new(MessageSet)} + partitions[msg.Partition] = set + } + + set.msgs = append(set.msgs, msg) + set.setToSend.addMessage(&Message{Codec: CompressionNone, Key: key, Value: val}) + + size := producerMessageOverhead + len(key) + len(val) + set.bufferBytes += size + ps.bufferBytes += size + ps.bufferCount++ + + return nil +} + +func (ps *produceSet) buildRequest() *ProduceRequest { + req := &ProduceRequest{ + RequiredAcks: ps.parent.conf.Producer.RequiredAcks, + Timeout: int32(ps.parent.conf.Producer.Timeout / time.Millisecond), + } + + for topic, partitionSet := range ps.msgs { + for partition, set := range partitionSet { + if ps.parent.conf.Producer.Compression == CompressionNone { + req.AddSet(topic, partition, set.setToSend) + } else { + // When compression is enabled, the entire set for each partition is compressed + // and sent as the payload of a single fake "message" with the appropriate codec + // set and no key. When the server sees a message with a compression codec, it + // decompresses the payload and treats the result as its message set. + payload, err := encode(set.setToSend) + if err != nil { + Logger.Println(err) // if this happens, it's basically our fault. + panic(err) + } + req.AddMessage(topic, partition, &Message{ + Codec: ps.parent.conf.Producer.Compression, + Key: nil, + Value: payload, + }) + } + } + } + + return req +} + +func (ps *produceSet) eachPartition(cb func(topic string, partition int32, msgs []*ProducerMessage)) { + for topic, partitionSet := range ps.msgs { + for partition, set := range partitionSet { + cb(topic, partition, set.msgs) + } + } +} + +func (ps *produceSet) empty() bool { + return ps.bufferCount == 0 +} + // utility functions func (p *asyncProducer) shutdown() { @@ -813,53 +900,6 @@ func (p *asyncProducer) shutdown() { close(p.successes) } -func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest { - - req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)} - empty := true - - for topic, partitionSet := range batch { - for partition, msgSet := range partitionSet { - setToSend := new(MessageSet) - setSize := 0 - for _, msg := range msgSet { - if p.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > p.conf.Producer.MaxMessageBytes { - // compression causes message-sets to be wrapped as single messages, which have tighter - // size requirements, so we have to respect those limits - valBytes, err := encode(setToSend) - if err != nil { - Logger.Println(err) // if this happens, it's basically our fault. - panic(err) - } - req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes}) - setToSend = new(MessageSet) - setSize = 0 - } - setSize += msg.byteSize() - - setToSend.addMessage(&Message{Codec: CompressionNone, Key: msg.keyCache, Value: msg.valueCache}) - empty = false - } - - if p.conf.Producer.Compression == CompressionNone { - req.AddSet(topic, partition, setToSend) - } else { - valBytes, err := encode(setToSend) - if err != nil { - Logger.Println(err) // if this happens, it's basically our fault. - panic(err) - } - req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes}) - } - } - } - - if empty { - return nil - } - return req -} - func (p *asyncProducer) returnError(msg *ProducerMessage, err error) { msg.clear() pErr := &ProducerError{Msg: msg, Err: err} @@ -873,17 +913,12 @@ func (p *asyncProducer) returnError(msg *ProducerMessage, err error) { func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) { for _, msg := range batch { - if msg != nil { - p.returnError(msg, err) - } + p.returnError(msg, err) } } func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) { for _, msg := range batch { - if msg == nil { - continue - } if p.conf.Producer.Return.Successes { msg.clear() p.successes <- msg @@ -892,17 +927,18 @@ func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) { } } +func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) { + if msg.retries >= p.conf.Producer.Retry.Max { + p.returnError(msg, err) + } else { + msg.retries++ + p.retries <- msg + } +} + func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) { for _, msg := range batch { - if msg == nil { - continue - } - if msg.retries >= p.conf.Producer.Retry.Max { - p.returnError(msg, err) - } else { - msg.retries++ - p.retries <- msg - } + p.retryMessage(msg, err) } }