-
Notifications
You must be signed in to change notification settings - Fork 404
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Added instrumentation for kafkajs.Kafka.consumer
#2244
Conversation
|
||
agent.on('transactionFinished', (tx) => { | ||
const name = `MessageBroker/Kafka/Topic/Produce/Named/${topic}` | ||
const segment = tx.agent.tracer.getSegment() | ||
if (tx.name === expectedName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the changes in this test. Why do we need to wrap the expectation in a conditional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consumers create transactions. We need to check which transaction is getting asserted so I named the producer transactions so we can do that
Description
This PR adds instrumentation for
kafkajs.Kafka.consumer.run
. I did not instrument eachBatch. I'm not sure how we should handle this. Our message shim handles transaction naming on a 1 message basis as it needs the topic name to name the consumption transaction. For now we will defer until someone asks for it. I plan on adding a story to track if customers are using this method. I had to tweak both the message-shim to allow to wrap a consumer when it is an object. The consumer arg of MessageSpec is a number so i addedfunctions
to allow to wrap n functions at the position of the consumer. Lastly, the traceparent and tracecontext are buffers so I had to handle parsing those as buffers.How to Test
Related Issues
Closes #2218