Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Header information not received in the consumer #1074

Closed
honghzzhang opened this issue Apr 2, 2018 · 13 comments
Closed

Header information not received in the consumer #1074

honghzzhang opened this issue Apr 2, 2018 · 13 comments
Labels

Comments

@honghzzhang
Copy link

honghzzhang commented Apr 2, 2018

Versions

Sarama Version: v1.16.0
Kafka Version: kafka_2.11-1.0.0.tgz
Go Version: go1.10 darwin/amd64

Configuration

I have written a small test reproducer. Please see attached files producer.go and consumer.go.

Logs

The output from running my test producer which has the headers set:
41973 producer.go:24] producer message &{send test message for headers [{[] []} {[116 101 115 116 72 101 97 100 101 114 49] [116 101 115 116 86 97 108 117 101 49]}] 0 0 0001-01-01 00:00:00 +0000 UTC 0 0}
41973 producer.go:27] producer message header key testHeader1, value testValue1

The output from running my test consumer which has zero headers:
41959 consumer.go:25] Message topic: send, partition: 0, offset: 0, key: , value: test message for headers
41959 consumer.go:26] Consumer message header size 0

Problem Description

When sending a message with header from producer side, the header information got lost from the consumer side.

Some information about the project I am working on:
In my project, I am using the kafka header to propagate the zipkin tracing information. I am using the sarama library for the producer side and sarama-cluster library for the consumer side.

In the simple reproduce I wrote, I used sarama library for both producer and consumer to rule out potential issues from sarama-cluster library.

For the documentation, the consumer message header is supported for kafka version 0.11+. And I am using Kafka 1.0.0 version which should have the support.

@honghzzhang
Copy link
Author

(I had to change the suffix of the go files to .txt to attach the files.)
consumer.go.txt
producer.go.txt

@eapache
Copy link
Contributor

eapache commented Apr 2, 2018

You need to set Version key in your config to tell Sarama that you're running against Kafka 1.0 and that it actually supports the headers.

We should potentially provide an error message in this case.

@honghzzhang
Copy link
Author

@eapache Appreciate the quick response on this!
A few follow up questions on this:

  1. Does the version need to be set in both producer and consumer config? If yes, do the versions have to match in producer and consumer for things to work?

  2. I added the following lines when constructing the sarama config in the producer:
    kafkaVersion, err := sarama.ParseKafkaVersion("1.0.0")
    if err != nil {
    glog.Errorf("Fail to parse kafkaVersion %v", err)
    }
    glog.Infof("Kafka version is %v", kafkaVersion)
    config := sarama.NewConfig()
    config.Version = kafkaVersion

and afterwards the producer could no longer send message successfully, it returns "-1, -1" for the partition and offset value for the SendMessage call:
partition, offset, err := producer.SendMessage(msg)
What did I do wrong?

  1. If I only add the above lines for setting config version in the consumer, the consumer can still receive message, but the headers is still empty in the message. Is my syntax of setting version correct?

  2. And a more general question: if the code needs to set the version explicitly in the sarama config when constructing producer and consumer, what happens when we move from one kafka version to another kafka version. For example, I downloaded kafka back in Feb for version 1.0.0, and now the latest kafka version is 1.1.0. If we use a later version of kafka bus, do we have to change the code to match with the version of the underlying kafka bus instance that we are using? That seems it could be hard to maintain the code.

@eapache
Copy link
Contributor

eapache commented Apr 2, 2018

Does the version need to be set in both producer and consumer config?

Yes

If yes, do the versions have to match in producer and consumer for things to work?

I mean, technically they don't have to, but they should because it should match the actual Kafka version, and presumably the producer and consumer are talking to the same cluster.

What did I do wrong?

It looks right to me. What is the value of the error message that is returned beside partition and offset?

If I only add the above lines for setting config version in the consumer, the consumer can still receive message, but the headers is still empty in the message. Is my syntax of setting version correct?

Yes, that's expected, the producer has to have the version sent in order to send the headers in the first place.

And a more general question: if the code needs to set the version explicitly in the sarama config when constructing producer and consumer, what happens when we move from one kafka version to another kafka version. For example, I downloaded kafka back in Feb for version 1.0.0, and now the latest kafka version is 1.1.0. If we use a later version of kafka bus, do we have to change the code to match with the version of the underlying kafka bus instance that we are using? That seems it could be hard to maintain the code.

Kafka has backwards-compatibility, so you can use newer versions of Kafka with older versions of the Sarama just fine. The only thing that won't work is features which depend on the newer version. So the process is: upgrade kafka (while leaving sarama config on the old version), then update the sarama config to take advantage of the new features.

@honghzzhang
Copy link
Author

@eapache This is the error in sending message when the version is set in producer config:
5013 producer.go:38] Failed to send message: kafka server: Message contents does not match its CRC.

@honghzzhang
Copy link
Author

Any suggestions on how I could work around the problem or further debug the problem?

@eapache
Copy link
Contributor

eapache commented Apr 4, 2018

I'm not sure, a CRC error seems very strange given the way messages are encoded. I suppose one thing to try would be to look at the message on the wire and see if the CRC is actually wrong or not? I wonder if headers are not supposed to count toward the CRC or something?

cc @wladh in case you have any ideas?

@honghzzhang
Copy link
Author

Thanks @eapache. How do I look at the message on the wire to check for CRC? And what is the expected value for CRC? Can you explain more in details?

This is a very simple producer/consumer and just produce/consume string message. I will attach my updated copies of the producer/consumer code with the kafka version set in the config and it will be great if you can take a look from your side when you get a chance to see if you can reproduce and find the problem with it. I am using kafka_2.11-1.0.0 version .

And if you have a working example of propagating kafka headers from producer to consumer, that will be helpful to me also.

@honghzzhang
Copy link
Author

Add .txt suffix to the files so they can be attached to the issue.

producer_withversion.go.txt
consumer_withversion.go.txt

@wladh
Copy link
Contributor

wladh commented Apr 9, 2018

CRC error message is a bit misleading, because the error can mean more than that (basically it means something like "message is corrupted in some way"). You can check the broker logs (but you might need to increase the logging level to TRACE or DEBUG) to see what went wrong.
I seem to remember that trying to send message with headers when one of the consumers of that topic doesn't support them, will generate an exception on the broker and that will send the corruption error to the client.

@honghzzhang
Copy link
Author

honghzzhang commented Apr 9, 2018

I have figured out the problem!

In my producer code, before I added headers to the producer message header, I had this
if msg.Headers == nil {
msg.Headers = make([]sarama.RecordHeader, 1)
}
to initialize msg.Header if it's nil.

And when I added my headers
testHeader := sarama.RecordHeader{Key: []byte("testHeader2"), Value: []byte("testValue2")}
msg.Headers = append(msg.Headers, testHeader)

this produced two entries in the msg.Headers, one is with zero length (from the make?), the other is the header I added. And the first empty header caused issue. I don't know if the underlying code can be updated to handle this condition better or produced a better error message.

I will leave the issue open to let you guys decide what can be improved in error handling in my use case. I think error message indicating version (0.11+) need to be set on producer/consumer config to use the kafka headers will definitely be helpful for user. I understood I needed to use version 0.11+ to use the kafka headers, but I did not realize I need to set the version explicitly in the config.

@eapache @wladh, thanks much for helping me resolve this issue.

@wblakecaldwell
Copy link
Contributor

@honghzzhang - your make call is creating a slice with a length of 1. What you want is to either create the slice with length 0, or with length 0 and capacity 1 (more efficient, since you're about to add to it):

if msg.Headers == nil {
    msg.Headers = make([]sarama.RecordHeader, 0, 1)
}

@st34m3r
Copy link

st34m3r commented May 3, 2021

i've got the same problem but turns out , I just needed to upgrade my sarama package , in my case from 1.19 to 1.28 :
go get: upgraded github.com/Shopify/sarama v1.19.0 => v1.28.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants