Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add headers to consumed/produced records #179

Merged
merged 6 commits into from Oct 16, 2021
Merged

Add headers to consumed/produced records #179

merged 6 commits into from Oct 16, 2021

Conversation

sigevsky
Copy link

@sigevsky sigevsky commented Sep 10, 2021

Closes #58

Implementation details

  1. I'm using [rd_kafka_produceva] (https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L4397) bindings (which is identical to rd_kafka_producev, besides structure passings instead of varargs) as regular rd_kafka_produce message doesn't support headers. This requires at least 1.5.0 librdkafka version: producev without varargs confluentinc/librdkafka#2895
  2. rd_kafka_produce_batch does not allow passing headers as well so I left ProduceRecord as is. Instead I've added a separate produceWithHeaders to cover the case when singular message is produced with headers.
  3. Consumer record now has crHeaders field which is a breaking change, but I don't really see how this can be done differently without duplicating the existing api.

@sigevsky
Copy link
Author

@AlexeyRaga could you help me with pipeline/review?

Copy link
Member

@AlexeyRaga AlexeyRaga left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! It is really great to see movements in this direction.

TBH, I think that we should just drop batch producer from the library. I am saying this based on this conversation with Magnus (the author of librdkafka)
image

This would allow us to simplify the producer, add headers to the message type, clean up the story about delivery callbacks, etc.

What do you think?

hn <- BS.packCString cstr
hv <- word8PtrToBS (fromIntegral csize) wptr
go ((hn, hv) : acc) (idx + 1)
_ -> error "Unexpected error code"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The specification says:

 * @returns RD_KAFKA_RESP_ERR_NO_ERROR if headers were returned,
 *          RD_KAFKA_RESP_ERR__NOENT if the message has no headers,
 *          or another error code if the headers could not be parsed.
 *

Can we return Left in case of that another error code instead of failing with an error?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are citing rd_kafka_message_headers doc (this case is handled a bit higher) whereas this code corresponds to rd_kafka_header_get_all which has the following constraints for its return value:

 * @returns RD_KAFKA_RESP_ERR_NO_ERROR if an entry was found, else
 *          RD_KAFKA_RESP_ERR__NOENT.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it can be an overlook at librdkafka documentation side because handling "if the headers could not be parsed" in another looks strange.
But OK, we can fix it later.

@sigevsky sigevsky requested a review from AlexeyRaga October 10, 2021 13:34
@AlexeyRaga AlexeyRaga merged commit 5e61094 into haskell-works:main Oct 16, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for message headers
2 participants