diff --git a/produce_set.go b/produce_set.go index 9e1d50d3a..627fdf031 100644 --- a/produce_set.go +++ b/produce_set.go @@ -84,9 +84,9 @@ func (ps *produceSet) add(msg *ProducerMessage) error { size += len(key) + len(val) if len(msg.Headers) > 0 { rec.Headers = make([]*RecordHeader, len(msg.Headers)) - for i, h := range msg.Headers { - rec.Headers[i] = &h - size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32 + for i := range msg.Headers { + rec.Headers[i] = &msg.Headers[i] + size += len(rec.Headers[i].Key) + len(rec.Headers[i].Value) + 2*binary.MaxVarintLen32 } } set.recordsToSend.recordBatch.addRecord(rec) diff --git a/produce_set_test.go b/produce_set_test.go index 0f96e8818..76afd455a 100644 --- a/produce_set_test.go +++ b/produce_set_test.go @@ -1,6 +1,7 @@ package sarama import ( + "fmt" "testing" "time" ) @@ -196,6 +197,20 @@ func TestProduceSetV3RequestBuilding(t *testing.T) { Partition: 0, Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage), + Headers: []RecordHeader{ + RecordHeader{ + Key: []byte("header-1"), + Value: []byte("value-1"), + }, + RecordHeader{ + Key: []byte("header-2"), + Value: []byte("value-2"), + }, + RecordHeader{ + Key: []byte("header-3"), + Value: []byte("value-3"), + }, + }, Timestamp: now, } for i := 0; i < 10; i++ { @@ -218,5 +233,16 @@ func TestProduceSetV3RequestBuilding(t *testing.T) { if rec.TimestampDelta != time.Duration(i)*time.Second { t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta) } + + for j, h := range batch.Records[i].Headers { + exp := fmt.Sprintf("header-%d", j+1) + if string(h.Key) != exp { + t.Errorf("Wrong header key, expected %v, got %v", exp, h.Key) + } + exp = fmt.Sprintf("value-%d", j+1) + if string(h.Value) != exp { + t.Errorf("Wrong header value, expected %v, got %v", exp, h.Value) + } + } } }