Skip to content

BT-OpenSource/crafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

79 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

crafka

Build Status Release License

Detailed documentation: https://crystaldoc.info/github/BT-OpenSource/crafka/main/index.html

Installation

Add this to your application's shard.yml:

dependencies:
  crafka:
    github: bt-opensource/crafka

Usage

require "crafka"

Producing

producer = Kafka::Producer.new({"bootstrap.servers" => "localhost:9092", "broker.address.family" => "v4"})
producer.produce(topic: "topic_name", payload: "my message".to_slice)

# Optionally
producer.poll # Serves queued callbacks
producer.flush # Wait for outstanding produce requests to complete

All available args to #produce: topic, payload, key, timestamp.

Auto Polling

librdkafka recommends that rd_kafka_poll is called at regular intervals to serve queued callbacks. This functionality is built in to Crafka.

By default after each #produce, a Kafka::Producer will call poll if it hasn't polled in the last 5 seconds.

You can configure this with the poll_interval argument:

producer = Kafka::Producer.new(
  {"bootstrap.servers" => "localhost:9092", "broker.address.family" => "v4"},
  poll_interval: 30
)

To disable auto polling, set poll_interval to 0.

Debug Statistics

To enable capturing of the statistics described here you can pass a stats_path argument to Kafka::Producer.new containing the location of a file to be written to.

Also ensure that you set the statistics.interval.ms in your producer config.

producer = Kafka::Producer.new(
  {"bootstrap.servers" => "localhost:9092", "broker.address.family" => "v4", "statistics.interval.ms" => "5000"},
  stats_path: "/some/directory/librdkafka_stats.json"
)

Consuming

consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9092", "broker.address.family" => "v4", "group.id" => "consumer_group_name"})
consumer.subscribe("topic_name")
consumer.each do |message|
  # message is an instance of Kafka::Message
  puts "#{String.new(message.topic)} -> #{String.new(message.payload)}"
end
consumer.close

Subscribing to multiple topics

consumer.subscribe("topic_name", "another_topic", "more_and_more")

consumer.subscribe("^starts_with") # subscribe to multiple with a regex

Development

Running Tests

make setup
crystal spec

Releasing

  1. Update shard.yml and src/crafka.cr with new version number
  2. Update CHANGELOG.md with changes
  3. Commit and tag commit

Credits

Originally forked from: https://github.com/CloudKarafka/kafka.cr