Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ruby kafka gem instrumentation #2842

Merged
merged 18 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

## dev

Version <dev> adds instrumentation for the rdkafka gem.
Version <dev> adds kafka instrumentation for the rdkafka and ruby-kafka gems.
tannalynn marked this conversation as resolved.
Show resolved Hide resolved

- **Feature: Add kafka instrumentation for the rdkafka and ruby-kafka gems**
tannalynn marked this conversation as resolved.
Show resolved Hide resolved

The agent now has instrumentation for both the rdkafka and ruby-kafka gems. The agent will now record transactions and message broker segments for produce and consume calls made using these gems. [PR#2824](https://github.com/newrelic/newrelic-ruby-agent/pull/2824) [PR#2842](https://github.com/newrelic/newrelic-ruby-agent/pull/2842)
tannalynn marked this conversation as resolved.
Show resolved Hide resolved

- **Feature: Add instrumentation for the rdkafka gem**

The agent now has instrumentation for the rdkafka gem and will record message broker segments for produce and consume calls made using this gem. [PR#2824](https://github.com/newrelic/newrelic-ruby-agent/pull/2824)


## v9.13.0
Expand Down
8 changes: 8 additions & 0 deletions lib/new_relic/agent/configuration/default_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1463,6 +1463,14 @@ def self.enforce_fallback(allowed_values: nil, fallback: nil)
:allowed_from_server => false,
:description => 'Controls auto-instrumentation of bunny at start-up. May be one of: `auto`, `prepend`, `chain`, `disabled`.'
},
:'instrumentation.ruby_kafka' => {
:default => 'auto',
:public => true,
:type => String,
:dynamic_name => true,
:allowed_from_server => false,
:description => 'Controls auto-instrumentation of the ruby-kafka library at start-up. May be one of `auto`, `prepend`, `chain`, `disabled`.'
},
:'instrumentation.opensearch' => {
:default => 'auto',
:documentation_default => 'auto',
Expand Down
27 changes: 27 additions & 0 deletions lib/new_relic/agent/instrumentation/ruby_kafka.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# This file is distributed under New Relic's license terms.
# See https://github.com/newrelic/newrelic-ruby-agent/blob/main/LICENSE for complete details.
# frozen_string_literal: true

require_relative 'ruby_kafka/instrumentation'
require_relative 'ruby_kafka/chain'
require_relative 'ruby_kafka/prepend'

DependencyDetection.defer do
named :'ruby_kafka'

depends_on do
defined?(Kafka)
end

executes do
NewRelic::Agent.logger.info('Installing ruby-kafka instrumentation')

if use_prepend?
prepend_instrument Kafka::Producer, NewRelic::Agent::Instrumentation::RubyKafkaProducer::Prepend
prepend_instrument Kafka::Consumer, NewRelic::Agent::Instrumentation::RubyKafkaConsumer::Prepend
prepend_instrument Kafka::Client, NewRelic::Agent::Instrumentation::RubyKafkaClient::Prepend
else
chain_instrument NewRelic::Agent::Instrumentation::RubyKafka::Chain
end
end
end
55 changes: 55 additions & 0 deletions lib/new_relic/agent/instrumentation/ruby_kafka/chain.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# This file is distributed under New Relic's license terms.
# See https://github.com/newrelic/newrelic-ruby-agent/blob/main/LICENSE for complete details.
# frozen_string_literal: true

module NewRelic::Agent::Instrumentation
module RubyKafka::Chain
def self.instrument!
::Kafka::Producer.class_eval do
include NewRelic::Agent::Instrumentation::RubyKafka

alias_method(:produce_without_new_relic, :produce)

def produce(value, **kwargs)
produce_with_new_relic(value, **kwargs) do |headers|
kwargs[:headers] = headers
produce_without_new_relic(value, **kwargs)
end
end
end

::Kafka::Consumer.class_eval do
include NewRelic::Agent::Instrumentation::RubyKafka

alias_method(:each_message_without_new_relic, :each_message)

def each_message(*args)
each_message_without_new_relic(*args) do |message|
each_message_with_new_relic(message) do
yield(message)
end
end
end
end

::Kafka::Client.class_eval do
include NewRelic::Agent::Instrumentation::RubyKafkaConfig

alias_method(:producer_without_new_relic, :producer)
alias_method(:consumer_without_new_relic, :consumer)

def producer(**kwargs)
producer_without_new_relic(**kwargs).tap do |producer|
set_nr_config(producer)
end
end

def consumer(**kwargs)
consumer_without_new_relic(**kwargs).tap do |consumer|
set_nr_config(consumer)
end
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# This file is distributed under New Relic's license terms.
# See https://github.com/newrelic/newrelic-ruby-agent/blob/main/LICENSE for complete details.
# frozen_string_literal: true

require 'new_relic/agent/messaging'

module NewRelic::Agent::Instrumentation
module RubyKafka
MESSAGING_LIBRARY = 'Kafka'
PRODUCE = 'Produce'
CONSUME = 'Consume'

INSTRUMENTATION_NAME = 'ruby-kafka'

def produce_with_new_relic(value, **kwargs)
NewRelic::Agent.record_instrumentation_invocation(INSTRUMENTATION_NAME)

topic_name = kwargs[:topic]
fallwith marked this conversation as resolved.
Show resolved Hide resolved
segment = NewRelic::Agent::Tracer.start_message_broker_segment(
action: :produce,
library: MESSAGING_LIBRARY,
destination_type: :topic,
destination_name: topic_name
)
create_kafka_metrics(action: PRODUCE, topic: topic_name)

headers = kwargs[:headers] || {}
::NewRelic::Agent::DistributedTracing.insert_distributed_trace_headers(headers)

NewRelic::Agent::Tracer.capture_segment_error(segment) { yield(headers) }
ensure
segment&.finish
end

def each_message_with_new_relic(message)
NewRelic::Agent.record_instrumentation_invocation(INSTRUMENTATION_NAME)

headers = message&.headers || {}
topic_name = message&.topic

NewRelic::Agent::Messaging.wrap_message_broker_consume_transaction(
library: MESSAGING_LIBRARY,
destination_type: :topic,
destination_name: topic_name,
headers: headers,
action: :consume
) do
create_kafka_metrics(action: CONSUME, topic: topic_name)
yield
end
end

def create_kafka_metrics(action:, topic:)
@nr_config.each do |seed_broker|
host = "#{seed_broker&.host}:#{seed_broker&.port}"
NewRelic::Agent.record_metric("MessageBroker/Kafka/Nodes/#{host}/#{action}/#{topic}", 1)
NewRelic::Agent.record_metric("MessageBroker/Kafka/Nodes/#{host}", 1)
end
end
end

module RubyKafkaConfig
def set_nr_config(producer_or_consumer)
producer_or_consumer.instance_variable_set(:@nr_config, @seed_brokers)
end
end
end
50 changes: 50 additions & 0 deletions lib/new_relic/agent/instrumentation/ruby_kafka/prepend.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# This file is distributed under New Relic's license terms.
# See https://github.com/newrelic/newrelic-ruby-agent/blob/main/LICENSE for complete details.
# frozen_string_literal: true

module NewRelic::Agent::Instrumentation
module RubyKafkaProducer
module Prepend
include NewRelic::Agent::Instrumentation::RubyKafka

def produce(value, **kwargs)
produce_with_new_relic(value, **kwargs) do |headers|
kwargs[:headers] = headers
super
end
end
end
end

module RubyKafkaConsumer
module Prepend
include NewRelic::Agent::Instrumentation::RubyKafka

def each_message(*args)
super do |message|
each_message_with_new_relic(message) do
yield(message)
end
end
end
end
end

module RubyKafkaClient
module Prepend
include NewRelic::Agent::Instrumentation::RubyKafkaConfig

def producer(**kwargs)
super.tap do |producer|
set_nr_config(producer)
end
end

def consumer(**kwargs)
super.tap do |consumer|
set_nr_config(consumer)
end
end
end
end
end
9 changes: 9 additions & 0 deletions test/multiverse/suites/ruby_kafka/Envfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# This file is distributed under New Relic's license terms.
# See https://github.com/newrelic/newrelic-ruby-agent/blob/main/LICENSE for complete details.
# frozen_string_literal: true

instrumentation_methods :chain, :prepend

gemfile <<~RB
tannalynn marked this conversation as resolved.
Show resolved Hide resolved
gem 'ruby-kafka'
RB
19 changes: 19 additions & 0 deletions test/multiverse/suites/ruby_kafka/config/newrelic.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
development:
error_collector:
enabled: true
apdex_t: 0.5
monitor_mode: true
license_key: bootstrap_newrelic_admin_license_key_000
instrumentation:
ruby_kafka: <%= $instrumentation_method %>
app_name: test
log_level: debug
host: 127.0.0.1
api_host: 127.0.0.1
transaction_trace:
record_sql: obfuscated
enabled: true
stack_trace_threshold: 0.5
transaction_threshold: 1.0
capture_params: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# This file is distributed under New Relic's license terms.
# See https://github.com/newrelic/newrelic-ruby-agent/blob/main/LICENSE for complete details.
# frozen_string_literal: true

class RubyKafkaInstrumentationTest < Minitest::Test
def setup
@topic = 'ruby-test-topic' + Time.now.to_i.to_s
@stats_engine = NewRelic::Agent.instance.stats_engine
end

def teardown
harvest_span_events!
harvest_transaction_events!
NewRelic::Agent.instance.stats_engine.clear_stats
mocha_teardown
end

def test_produce_creates_span_metrics
in_transaction do |txn|
produce_message
end

spans = harvest_span_events!
span = spans[1][0]

assert_equal "MessageBroker/Kafka/Topic/Produce/Named/#{@topic}", span[0]['name']
assert_metrics_recorded "MessageBroker/Kafka/Nodes/#{host}"
assert_metrics_recorded "MessageBroker/Kafka/Nodes/#{host}/Produce/#{@topic}"
end

def test_consume_creates_span_metrics
produce_message
harvest_span_events!

consumer = config.consumer(group_id: 'ruby-test')
consumer.subscribe(@topic)
consumer.each_message do |message|
# get 1 message and leave
break
end

spans = harvest_span_events!
span = spans[1][0]

assert_equal "OtherTransaction/Message/Kafka/Topic/Consume/Named/#{@topic}", span[0]['name']
assert_metrics_recorded "MessageBroker/Kafka/Nodes/#{host}"
assert_metrics_recorded "MessageBroker/Kafka/Nodes/#{host}/Consume/#{@topic}"
end

def test_rdkafka_distributed_tracing
NewRelic::Agent.agent.stub :connected?, true do
with_config(account_id: '190', primary_application_id: '46954', trusted_account_key: 'trust_this!') do
in_transaction('first_txn_for_dt') do |txn|
produce_message
end
end
first_txn = harvest_transaction_events![1]

consumer = config.consumer(group_id: 'ruby-test')
consumer.subscribe(@topic)
consumer.each_message do |message|
# get 1 message and leave
break
end
txn = harvest_transaction_events![1]

assert_metrics_recorded 'Supportability/DistributedTrace/CreatePayload/Success'
assert_equal txn[0][0]['traceId'], first_txn[0][0]['traceId']
assert_equal txn[0][0]['parentId'], first_txn[0][0]['guid']
end
end

def host
'127.0.0.1:9092'
end

def config
Kafka.new([host], client_id: 'ruby-test')
end

def produce_message(producer = config.producer)
producer.produce(
'Payload 1',
topic: @topic,
key: 'Key 1'
)
producer.deliver_messages
producer.shutdown
end
end
Loading