Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,3 @@ source 'https://rubygems.org'

# Specify your gem's dependencies in fluent-plugin-kafka.gemspec
gemspec

gem 'rdkafka', ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE'] if ENV['USE_RDKAFKA']
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,36 @@ See [Authentication using SASL](https://github.com/zendesk/ruby-kafka#authentica
Set username, password, scram_mechanism and sasl_over_ssl for SASL/Plain or Scram authentication.
See [Authentication using SASL](https://github.com/zendesk/ruby-kafka#authentication-using-sasl) for more details.

##### with MSK IAM Authentication (only for `rdkafka2` output type)
Authentication and authorization with an MSK cluster are facilitated through a base64-encoded signed URL, which is generated by the [aws-msk-iam-sasl-signer-ruby](https://github.com/bruce-szalwinski-he/aws-msk-iam-sasl-signer-ruby) library.

**Configuration Example**
To enable this feature, configure your Fluentd input as follows:

```
<match *>
@type rdkafka2
# Kafka brokers to connect to (typically on port 9098 or 9198 for IAM authentication)
brokers <broker_addresses>
# Topic to write events to
topic_key test-topic-1
default_topic test-topic-1

# AWS Region (required)
aws_msk_region us-east-1

# Use a shared producer for the connection (required)
share_producer true

# MSK IAM authentication settings (required)
rdkafka_options {
"security.protocol": "sasl_ssl",
"sasl.mechanisms": "OAUTHBEARER"
}
</match>
```
With this configuration, Fluentd will handle the token refresh and manage the connection to your MSK cluster using AWS IAM authentication.

### Input plugin (@type 'kafka')

Consume events by single consumer.
Expand Down
7 changes: 7 additions & 0 deletions fluent-plugin-kafka.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ Gem::Specification.new do |gem|
# gems that aren't default gems as of Ruby 3.4
gem.add_dependency("bigdecimal", ["~> 3.1"])

if ENV['USE_RDKAFKA']
gem.add_dependency 'rdkafka', [ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE']]
if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('3.0')
gem.add_dependency 'aws-msk-iam-sasl-signer', '~> 0.1.1'
end
end

gem.add_development_dependency "rake", ">= 0.9.2"
gem.add_development_dependency "test-unit", ">= 3.0.8"
gem.add_development_dependency "test-unit-rr", "~> 1.0"
Expand Down
39 changes: 39 additions & 0 deletions lib/fluent/plugin/out_rdkafka2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
raise "unable to patch rdkafka."
end

if Gem::Version.create(RUBY_VERSION) >= Gem::Version.create('3.0')
require 'aws-msk-iam-sasl-signer'
end

module Fluent::Plugin
class Fluent::Rdkafka2Output < Output
Fluent::Plugin.register_output('rdkafka2', self)
Expand Down Expand Up @@ -100,6 +104,7 @@ class Fluent::Rdkafka2Output < Output
config_param :service_name, :string, :default => nil, :desc => 'Used for sasl.kerberos.service.name'
config_param :unrecoverable_error_codes, :array, :default => ["topic_authorization_failed", "msg_size_too_large"],
:desc => 'Handle some of the error codes should be unrecoverable if specified'
config_param :aws_msk_region, :string, :default => nil, :desc => 'AWS region for MSK'

config_section :buffer do
config_set_default :chunk_keys, ["topic"]
Expand Down Expand Up @@ -209,6 +214,10 @@ def add(level, message = nil)
config = build_config
@rdkafka = Rdkafka::Config.new(config)

if config[:"security.protocol"] == "sasl_ssl" && config[:"sasl.mechanisms"] == "OAUTHBEARER"
Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token)
end

if @default_topic.nil?
if @use_default_for_unknown_topic || @use_default_for_unknown_partition_error
raise Fluent::ConfigError, "default_topic must be set when use_default_for_unknown_topic or use_default_for_unknown_partition_error is true"
Expand Down Expand Up @@ -296,9 +305,39 @@ def build_config
config
end

def refresh_token(_config, _client_name)
log.info("+--- Refreshing token")
client = get_producer
# This will happen once upon initialization and is expected to fail, as the producer isnt set yet
# We will set the token manually after creation and after that this refresh method will work
unless client
log.info("Could not get shared client handle, unable to set/refresh token (this is expected one time on startup)")
return
end
signer = AwsMskIamSaslSigner::MSKTokenProvider.new(region: @aws_msk_region)
token = signer.generate_auth_token

if token
client.oauthbearer_set_token(
token: token.token,
lifetime_ms: token.expiration_time_ms,
principal_name: "kafka-cluster"
)
else
client.oauthbearer_set_token_failure(
"Failed to generate token."
)
end
end

def start
if @share_producer
@shared_producer = @rdkafka.producer
log.info("Created shared producer")
if @aws_msk_region
refresh_token(nil, nil)
log.info("Set initial token for shared producer")
end
else
@producers = {}
@producers_mutex = Mutex.new
Expand Down