diff --git a/CHANGELOG.md b/CHANGELOG.md index 1bfa3fb77e..439c6ff2a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,11 +2,12 @@ ## dev -Version adds instrumentation for the rdkafka gem and fixes a JRuby bug in the configuration manager. -- **Feature: Add instrumentation for the rdkafka gem** +Version adds Apache Kafka instrumentation for the rdkafka and ruby-kafka gems and fixes a JRuby bug in the configuration manager. - The agent now has instrumentation for the rdkafka gem and will record message broker segments for produce and consume calls made using this gem. [PR#2824](https://github.com/newrelic/newrelic-ruby-agent/pull/2824) +- **Feature: Add Apache Kafka instrumentation for the rdkafka and ruby-kafka gems** + + The agent now has instrumentation for both the rdkafka and ruby-kafka gems. The agent will record transactions and message broker segments for produce and consume calls made using these gems. [PR#2824](https://github.com/newrelic/newrelic-ruby-agent/pull/2824) [PR#2842](https://github.com/newrelic/newrelic-ruby-agent/pull/2842) - **Bugfix: Jruby not saving configuration values correctly in configuration manager** diff --git a/lib/new_relic/agent/configuration/default_source.rb b/lib/new_relic/agent/configuration/default_source.rb index 08e028e939..ca417f8785 100644 --- a/lib/new_relic/agent/configuration/default_source.rb +++ b/lib/new_relic/agent/configuration/default_source.rb @@ -1463,6 +1463,14 @@ def self.enforce_fallback(allowed_values: nil, fallback: nil) :allowed_from_server => false, :description => 'Controls auto-instrumentation of bunny at start-up. May be one of: `auto`, `prepend`, `chain`, `disabled`.' }, + :'instrumentation.ruby_kafka' => { + :default => 'auto', + :public => true, + :type => String, + :dynamic_name => true, + :allowed_from_server => false, + :description => 'Controls auto-instrumentation of the ruby-kafka library at start-up. May be one of `auto`, `prepend`, `chain`, `disabled`.' + }, :'instrumentation.opensearch' => { :default => 'auto', :documentation_default => 'auto', diff --git a/lib/new_relic/agent/instrumentation/ruby_kafka.rb b/lib/new_relic/agent/instrumentation/ruby_kafka.rb new file mode 100644 index 0000000000..6ab2c88654 --- /dev/null +++ b/lib/new_relic/agent/instrumentation/ruby_kafka.rb @@ -0,0 +1,27 @@ +# This file is distributed under New Relic's license terms. +# See https://github.com/newrelic/newrelic-ruby-agent/blob/main/LICENSE for complete details. +# frozen_string_literal: true + +require_relative 'ruby_kafka/instrumentation' +require_relative 'ruby_kafka/chain' +require_relative 'ruby_kafka/prepend' + +DependencyDetection.defer do + named :'ruby_kafka' + + depends_on do + defined?(Kafka) + end + + executes do + NewRelic::Agent.logger.info('Installing ruby-kafka instrumentation') + + if use_prepend? + prepend_instrument Kafka::Producer, NewRelic::Agent::Instrumentation::RubyKafkaProducer::Prepend + prepend_instrument Kafka::Consumer, NewRelic::Agent::Instrumentation::RubyKafkaConsumer::Prepend + prepend_instrument Kafka::Client, NewRelic::Agent::Instrumentation::RubyKafkaClient::Prepend + else + chain_instrument NewRelic::Agent::Instrumentation::RubyKafka::Chain + end + end +end diff --git a/lib/new_relic/agent/instrumentation/ruby_kafka/chain.rb b/lib/new_relic/agent/instrumentation/ruby_kafka/chain.rb new file mode 100644 index 0000000000..896245a709 --- /dev/null +++ b/lib/new_relic/agent/instrumentation/ruby_kafka/chain.rb @@ -0,0 +1,55 @@ +# This file is distributed under New Relic's license terms. +# See https://github.com/newrelic/newrelic-ruby-agent/blob/main/LICENSE for complete details. +# frozen_string_literal: true + +module NewRelic::Agent::Instrumentation + module RubyKafka::Chain + def self.instrument! + ::Kafka::Producer.class_eval do + include NewRelic::Agent::Instrumentation::RubyKafka + + alias_method(:produce_without_new_relic, :produce) + + def produce(value, **kwargs) + produce_with_new_relic(value, **kwargs) do |headers| + kwargs[:headers] = headers + produce_without_new_relic(value, **kwargs) + end + end + end + + ::Kafka::Consumer.class_eval do + include NewRelic::Agent::Instrumentation::RubyKafka + + alias_method(:each_message_without_new_relic, :each_message) + + def each_message(*args) + each_message_without_new_relic(*args) do |message| + each_message_with_new_relic(message) do + yield(message) + end + end + end + end + + ::Kafka::Client.class_eval do + include NewRelic::Agent::Instrumentation::RubyKafkaConfig + + alias_method(:producer_without_new_relic, :producer) + alias_method(:consumer_without_new_relic, :consumer) + + def producer(**kwargs) + producer_without_new_relic(**kwargs).tap do |producer| + set_nr_config(producer) + end + end + + def consumer(**kwargs) + consumer_without_new_relic(**kwargs).tap do |consumer| + set_nr_config(consumer) + end + end + end + end + end +end diff --git a/lib/new_relic/agent/instrumentation/ruby_kafka/instrumentation.rb b/lib/new_relic/agent/instrumentation/ruby_kafka/instrumentation.rb new file mode 100644 index 0000000000..389fa3cb86 --- /dev/null +++ b/lib/new_relic/agent/instrumentation/ruby_kafka/instrumentation.rb @@ -0,0 +1,67 @@ +# This file is distributed under New Relic's license terms. +# See https://github.com/newrelic/newrelic-ruby-agent/blob/main/LICENSE for complete details. +# frozen_string_literal: true + +require 'new_relic/agent/messaging' + +module NewRelic::Agent::Instrumentation + module RubyKafka + MESSAGING_LIBRARY = 'Kafka' + PRODUCE = 'Produce' + CONSUME = 'Consume' + + INSTRUMENTATION_NAME = 'ruby-kafka' + + def produce_with_new_relic(value, **kwargs) + NewRelic::Agent.record_instrumentation_invocation(INSTRUMENTATION_NAME) + + topic_name = kwargs[:topic] + segment = NewRelic::Agent::Tracer.start_message_broker_segment( + action: :produce, + library: MESSAGING_LIBRARY, + destination_type: :topic, + destination_name: topic_name + ) + create_kafka_metrics(action: PRODUCE, topic: topic_name) + + headers = kwargs[:headers] || {} + ::NewRelic::Agent::DistributedTracing.insert_distributed_trace_headers(headers) + + NewRelic::Agent::Tracer.capture_segment_error(segment) { yield(headers) } + ensure + segment&.finish + end + + def each_message_with_new_relic(message) + NewRelic::Agent.record_instrumentation_invocation(INSTRUMENTATION_NAME) + + headers = message&.headers || {} + topic_name = message&.topic + + NewRelic::Agent::Messaging.wrap_message_broker_consume_transaction( + library: MESSAGING_LIBRARY, + destination_type: :topic, + destination_name: topic_name, + headers: headers, + action: :consume + ) do + create_kafka_metrics(action: CONSUME, topic: topic_name) + yield + end + end + + def create_kafka_metrics(action:, topic:) + @nr_config.each do |seed_broker| + host = "#{seed_broker&.host}:#{seed_broker&.port}" + NewRelic::Agent.record_metric("MessageBroker/Kafka/Nodes/#{host}/#{action}/#{topic}", 1) + NewRelic::Agent.record_metric("MessageBroker/Kafka/Nodes/#{host}", 1) + end + end + end + + module RubyKafkaConfig + def set_nr_config(producer_or_consumer) + producer_or_consumer.instance_variable_set(:@nr_config, @seed_brokers) + end + end +end diff --git a/lib/new_relic/agent/instrumentation/ruby_kafka/prepend.rb b/lib/new_relic/agent/instrumentation/ruby_kafka/prepend.rb new file mode 100644 index 0000000000..66cec78a33 --- /dev/null +++ b/lib/new_relic/agent/instrumentation/ruby_kafka/prepend.rb @@ -0,0 +1,50 @@ +# This file is distributed under New Relic's license terms. +# See https://github.com/newrelic/newrelic-ruby-agent/blob/main/LICENSE for complete details. +# frozen_string_literal: true + +module NewRelic::Agent::Instrumentation + module RubyKafkaProducer + module Prepend + include NewRelic::Agent::Instrumentation::RubyKafka + + def produce(value, **kwargs) + produce_with_new_relic(value, **kwargs) do |headers| + kwargs[:headers] = headers + super + end + end + end + end + + module RubyKafkaConsumer + module Prepend + include NewRelic::Agent::Instrumentation::RubyKafka + + def each_message(*args) + super do |message| + each_message_with_new_relic(message) do + yield(message) + end + end + end + end + end + + module RubyKafkaClient + module Prepend + include NewRelic::Agent::Instrumentation::RubyKafkaConfig + + def producer(**kwargs) + super.tap do |producer| + set_nr_config(producer) + end + end + + def consumer(**kwargs) + super.tap do |consumer| + set_nr_config(consumer) + end + end + end + end +end diff --git a/test/multiverse/suites/ruby_kafka/Envfile b/test/multiverse/suites/ruby_kafka/Envfile new file mode 100644 index 0000000000..c8fa620dfa --- /dev/null +++ b/test/multiverse/suites/ruby_kafka/Envfile @@ -0,0 +1,14 @@ +# This file is distributed under New Relic's license terms. +# See https://github.com/newrelic/newrelic-ruby-agent/blob/main/LICENSE for complete details. +# frozen_string_literal: true + +suite_condition('Skip in CI on newer ruby versions') do + # will run locally OR on CI for ruby < 3.4.0 + !ENV['CI'] || RUBY_VERSION < '3.4.0' +end + +instrumentation_methods :chain, :prepend + +gemfile <<~RB + gem 'ruby-kafka' +RB diff --git a/test/multiverse/suites/ruby_kafka/config/newrelic.yml b/test/multiverse/suites/ruby_kafka/config/newrelic.yml new file mode 100644 index 0000000000..09a10a4f2d --- /dev/null +++ b/test/multiverse/suites/ruby_kafka/config/newrelic.yml @@ -0,0 +1,19 @@ +--- +development: + error_collector: + enabled: true + apdex_t: 0.5 + monitor_mode: true + license_key: bootstrap_newrelic_admin_license_key_000 + instrumentation: + ruby_kafka: <%= $instrumentation_method %> + app_name: test + log_level: debug + host: 127.0.0.1 + api_host: 127.0.0.1 + transaction_trace: + record_sql: obfuscated + enabled: true + stack_trace_threshold: 0.5 + transaction_threshold: 1.0 + capture_params: false diff --git a/test/multiverse/suites/ruby_kafka/ruby_kafka_instrumentation_test.rb b/test/multiverse/suites/ruby_kafka/ruby_kafka_instrumentation_test.rb new file mode 100644 index 0000000000..1e49a13370 --- /dev/null +++ b/test/multiverse/suites/ruby_kafka/ruby_kafka_instrumentation_test.rb @@ -0,0 +1,90 @@ +# This file is distributed under New Relic's license terms. +# See https://github.com/newrelic/newrelic-ruby-agent/blob/main/LICENSE for complete details. +# frozen_string_literal: true + +class RubyKafkaInstrumentationTest < Minitest::Test + def setup + @topic = 'ruby-test-topic' + Time.now.to_i.to_s + @stats_engine = NewRelic::Agent.instance.stats_engine + end + + def teardown + harvest_span_events! + harvest_transaction_events! + NewRelic::Agent.instance.stats_engine.clear_stats + mocha_teardown + end + + def test_produce_creates_span_metrics + in_transaction do |txn| + produce_message + end + + spans = harvest_span_events! + span = spans[1][0] + + assert_equal "MessageBroker/Kafka/Topic/Produce/Named/#{@topic}", span[0]['name'] + assert_metrics_recorded "MessageBroker/Kafka/Nodes/#{host}" + assert_metrics_recorded "MessageBroker/Kafka/Nodes/#{host}/Produce/#{@topic}" + end + + def test_consume_creates_span_metrics + produce_message + harvest_span_events! + + consumer = config.consumer(group_id: 'ruby-test') + consumer.subscribe(@topic) + consumer.each_message do |message| + # get 1 message and leave + break + end + + spans = harvest_span_events! + span = spans[1][0] + + assert_equal "OtherTransaction/Message/Kafka/Topic/Consume/Named/#{@topic}", span[0]['name'] + assert_metrics_recorded "MessageBroker/Kafka/Nodes/#{host}" + assert_metrics_recorded "MessageBroker/Kafka/Nodes/#{host}/Consume/#{@topic}" + end + + def test_rdkafka_distributed_tracing + NewRelic::Agent.agent.stub :connected?, true do + with_config(account_id: '190', primary_application_id: '46954', trusted_account_key: 'trust_this!') do + in_transaction('first_txn_for_dt') do |txn| + produce_message + end + end + first_txn = harvest_transaction_events![1] + + consumer = config.consumer(group_id: 'ruby-test') + consumer.subscribe(@topic) + consumer.each_message do |message| + # get 1 message and leave + break + end + txn = harvest_transaction_events![1] + + assert_metrics_recorded 'Supportability/DistributedTrace/CreatePayload/Success' + assert_equal txn[0][0]['traceId'], first_txn[0][0]['traceId'] + assert_equal txn[0][0]['parentId'], first_txn[0][0]['guid'] + end + end + + def host + '127.0.0.1:9092' + end + + def config + Kafka.new([host], client_id: 'ruby-test') + end + + def produce_message(producer = config.producer) + producer.produce( + 'Payload 1', + topic: @topic, + key: 'Key 1' + ) + producer.deliver_messages + producer.shutdown + end +end