-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Header information not received in the consumer #1074
Comments
(I had to change the suffix of the go files to .txt to attach the files.) |
You need to set We should potentially provide an error message in this case. |
@eapache Appreciate the quick response on this!
and afterwards the producer could no longer send message successfully, it returns "-1, -1" for the partition and offset value for the SendMessage call:
|
Yes
I mean, technically they don't have to, but they should because it should match the actual Kafka version, and presumably the producer and consumer are talking to the same cluster.
It looks right to me. What is the value of the error message that is returned beside partition and offset?
Yes, that's expected, the producer has to have the version sent in order to send the headers in the first place.
Kafka has backwards-compatibility, so you can use newer versions of Kafka with older versions of the Sarama just fine. The only thing that won't work is features which depend on the newer version. So the process is: upgrade kafka (while leaving sarama config on the old version), then update the sarama config to take advantage of the new features. |
@eapache This is the error in sending message when the version is set in producer config: |
Any suggestions on how I could work around the problem or further debug the problem? |
I'm not sure, a CRC error seems very strange given the way messages are encoded. I suppose one thing to try would be to look at the message on the wire and see if the CRC is actually wrong or not? I wonder if headers are not supposed to count toward the CRC or something? cc @wladh in case you have any ideas? |
Thanks @eapache. How do I look at the message on the wire to check for CRC? And what is the expected value for CRC? Can you explain more in details? This is a very simple producer/consumer and just produce/consume string message. I will attach my updated copies of the producer/consumer code with the kafka version set in the config and it will be great if you can take a look from your side when you get a chance to see if you can reproduce and find the problem with it. I am using kafka_2.11-1.0.0 version . And if you have a working example of propagating kafka headers from producer to consumer, that will be helpful to me also. |
Add .txt suffix to the files so they can be attached to the issue. |
CRC error message is a bit misleading, because the error can mean more than that (basically it means something like "message is corrupted in some way"). You can check the broker logs (but you might need to increase the logging level to TRACE or DEBUG) to see what went wrong. |
I have figured out the problem! In my producer code, before I added headers to the producer message header, I had this And when I added my headers this produced two entries in the msg.Headers, one is with zero length (from the make?), the other is the header I added. And the first empty header caused issue. I don't know if the underlying code can be updated to handle this condition better or produced a better error message. I will leave the issue open to let you guys decide what can be improved in error handling in my use case. I think error message indicating version (0.11+) need to be set on producer/consumer config to use the kafka headers will definitely be helpful for user. I understood I needed to use version 0.11+ to use the kafka headers, but I did not realize I need to set the version explicitly in the config. @eapache @wladh, thanks much for helping me resolve this issue. |
@honghzzhang - your make call is creating a slice with a length of 1. What you want is to either create the slice with length 0, or with length 0 and capacity 1 (more efficient, since you're about to add to it):
|
i've got the same problem but turns out , I just needed to upgrade my sarama package , in my case from 1.19 to 1.28 : |
Versions
Sarama Version: v1.16.0
Kafka Version: kafka_2.11-1.0.0.tgz
Go Version: go1.10 darwin/amd64
Configuration
I have written a small test reproducer. Please see attached files producer.go and consumer.go.
Logs
The output from running my test producer which has the headers set:
41973 producer.go:24] producer message &{send test message for headers [{[] []} {[116 101 115 116 72 101 97 100 101 114 49] [116 101 115 116 86 97 108 117 101 49]}] 0 0 0001-01-01 00:00:00 +0000 UTC 0 0}
41973 producer.go:27] producer message header key testHeader1, value testValue1
The output from running my test consumer which has zero headers:
41959 consumer.go:25] Message topic: send, partition: 0, offset: 0, key: , value: test message for headers
41959 consumer.go:26] Consumer message header size 0
Problem Description
When sending a message with header from producer side, the header information got lost from the consumer side.
Some information about the project I am working on:
In my project, I am using the kafka header to propagate the zipkin tracing information. I am using the sarama library for the producer side and sarama-cluster library for the consumer side.
In the simple reproduce I wrote, I used sarama library for both producer and consumer to rule out potential issues from sarama-cluster library.
For the documentation, the consumer message header is supported for kafka version 0.11+. And I am using Kafka 1.0.0 version which should have the support.
The text was updated successfully, but these errors were encountered: