-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add headers to consumed/produced records #179
Conversation
@AlexeyRaga could you help me with pipeline/review? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! It is really great to see movements in this direction.
TBH, I think that we should just drop batch producer from the library. I am saying this based on this conversation with Magnus (the author of librdkafka
)
This would allow us to simplify the producer, add headers to the message type, clean up the story about delivery callbacks, etc.
What do you think?
src/Kafka/Internal/Shared.hs
Outdated
hn <- BS.packCString cstr | ||
hv <- word8PtrToBS (fromIntegral csize) wptr | ||
go ((hn, hv) : acc) (idx + 1) | ||
_ -> error "Unexpected error code" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The specification says:
* @returns RD_KAFKA_RESP_ERR_NO_ERROR if headers were returned,
* RD_KAFKA_RESP_ERR__NOENT if the message has no headers,
* or another error code if the headers could not be parsed.
*
Can we return Left
in case of that another error code
instead of failing with an error
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are citing rd_kafka_message_headers
doc (this case is handled a bit higher) whereas this code corresponds to rd_kafka_header_get_all
which has the following constraints for its return value:
* @returns RD_KAFKA_RESP_ERR_NO_ERROR if an entry was found, else
* RD_KAFKA_RESP_ERR__NOENT.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it can be an overlook at librdkafka
documentation side because handling "if the headers could not be parsed" in another looks strange.
But OK, we can fix it later.
Closes #58
Implementation details
ProduceRecord
as is. Instead I've added a separateproduceWithHeaders
to cover the case when singular message is produced with headers.crHeaders
field which is a breaking change, but I don't really see how this can be done differently without duplicating the existing api.