-
Notifications
You must be signed in to change notification settings - Fork 203
Brod group subscriber demo in Elixir
Zaiming Shi edited this page Mar 17, 2017
·
5 revisions
Original page author: @sdball.
Assuming you have kafka running on localhost:9092, and you have topic "brod-demo-group-subscriber-koc" with at least 2 partitions (for demo purposes).
Create a new elixir project:
$ mix new brod_demo && cd brod_demo
Add brod in dependencies in mix.exs:
defp deps do
[{:brod, "~> 2.3"}]
end
Add lib/brod_demo.ex:
defmodule BrodDemo do
@kafka_hosts [localhost: 9092]
@topic "brod-demo-group-subscriber-koc"
@produce_delay_seconds 2
## bootstrap demo with partition producers producing sequence numbers
## every DEFAULT seconds.
def bootstrap() do
bootstrap(@produce_delay_seconds)
end
## bootstrap demo subscrirber and a number of sequence number producers
def bootstrap(delay_seconds) do
{:ok, _} = Application.ensure_all_started(:brod)
:ok = BrodDemoSubscriber.bootstrap(@kafka_hosts, @topic)
:ok = BrodDemoProducers.bootstrap(@kafka_hosts, @topic, delay_seconds)
end
end
## The brod_group_subscriber implementation.
defmodule BrodDemoSubscriber do
@behaviour :brod_group_subscriber
require Logger
require Record
import Record, only: [defrecord: 2, extract: 2]
defrecord :kafka_message, extract(:kafka_message, from_lib: "brod/include/brod.hrl")
## API for demo
def bootstrap(kafka_hosts, topic) do
## A group ID is to be shared between the members (which often run in
## different Erlang nodes or even hosts).
group_id = "brod_demo_group_subscriber_koc-consumer-group"
## Different members may subscribe to identical or different set of topics.
## In the assignments, a member receives only the partitions from the
## subscribed topic set.
topic_set = [topic]
## In this demo, we spawn two members in the same Erlang node.
member_clients = [:brod_demo_group_subscriber_koc_client_1,
:brod_demo_group_subscriber_koc_client_2
]
:ok = bootstrap_subscribers(member_clients, kafka_hosts, group_id, topic_set)
end
## brod_group_subscriber callback
def init(_group_id, _callback_init_args = {client_id, topics}) do
## For demo, spawn one message handler per topic-partition.
## Depending on the use case:
## It might be enough to handle the message locally in the subscriber process
## without dispatching to handlers. i.e. retrun {:ok, :ac, callback_state} here.
## Or there could be a pool of handlers if the messages can be processed
## in arbitrary order.
handlers = spawn_message_handlers(client_id, topics)
{:ok, %{handlers: handlers}}
end
## brod_group_subscriber callback
def handle_message(topic, partition, message, %{handlers: handlers} = state) do
pid = handlers["#{topic}-#{partition}"]
## send the message to message handler process for async processing
send pid, message
## or return {:ok, :ack, state} in case message can be handled synchronously here
{:ok, state}
end
## API for internal use
def message_handler_loop(topic, partition, subscriber_pid) do
receive do
msg ->
%{offset: offset, value: value} = Enum.into(kafka_message(msg), %{})
Logger.info("#{inspect self} #{topic}-#{partition} Offset: #{offset}, Value: #{value}")
## send the async ack to group subscriber
## the offset will be eventually committed to kafak
:brod_group_subscriber.ack(subscriber_pid, topic, partition, offset)
__MODULE__.message_handler_loop(topic, partition, subscriber_pid)
after
1000 ->
__MODULE__.message_handler_loop(topic, partition, subscriber_pid)
end
end
defp bootstrap_subscribers([], _kafka_hosts, _group_id, _topics), do: :ok
defp bootstrap_subscribers([client_id | rest], kafka_hosts, group_id, topics) do
:ok = :brod.start_client(kafka_hosts, client_id, _client_config=[])
group_config = [offset_commit_policy: :commit_to_kafka_v2,
offset_commit_interval_seconds: 5,
rejoin_delay_seconds: 2
]
{:ok, _subscriber} =
:brod.start_link_group_subscriber(client_id, group_id, topics, group_config,
_consumer_config = [begin_offset: :earliest],
_callback_module = __MODULE__,
_callback_init_args = {client_id, topics})
bootstrap_subscribers(rest, kafka_hosts, group_id, topics)
end
defp spawn_message_handlers(_client_id, []), do: %{}
defp spawn_message_handlers(client_id, [topic | rest]) do
{:ok, partition_count} = :brod.get_partitions_count(client_id, topic)
handlers = Enum.reduce :lists.seq(0, partition_count-1), %{}, fn partition, acc ->
handler_pid = spawn_link(__MODULE__, :message_handler_loop, [topic, partition, self])
Map.put(acc, "#{topic}-#{partition}", handler_pid)
end
Map.merge(handlers, spawn_message_handlers(client_id, rest))
end
end
## Producers for demo
defmodule BrodDemoProducers do
## spawn per-partition processes to feed kafka with sequence numbers for demo
def bootstrap(kafka_hosts, topic, delay_seconds) do
producer_client_id = :brod_demo_group_subscriber_koc_producer_client
:ok = :brod.start_client(kafka_hosts, producer_client_id, _client_config=[])
:ok = :brod.start_producer(producer_client_id, topic, _producer_config=[])
{:ok, partition_count} = :brod.get_partitions_count(producer_client_id, topic)
:ok = spawn_producers(producer_client_id, topic, delay_seconds, partition_count)
end
defp spawn_producers(client_id, topic, delay_seconds, partition) when is_integer(partition) do
partitions = :lists.seq(0, partition-1)
spawn_producers(client_id, topic, delay_seconds, partitions)
end
defp spawn_producers(client_id, topic, delay_seconds, [partition | partitions]) do
spawn_link(fn -> producer_loop(client_id, topic, partition, delay_seconds, 0) end)
spawn_producers(client_id, topic, delay_seconds, partitions)
end
defp spawn_producers(_client_id, _topic, _delay_seconds, []), do: :ok
defp producer_loop(client_id, topic, partition, delay_seconds, seq_no) do
kafka_value = "#{seq_no}"
:ok = :brod.produce_sync(client_id, topic, partition, _key="", kafka_value)
:timer.sleep(:timer.seconds(delay_seconds))
producer_loop(client_id, topic, partition, delay_seconds, seq_no + 1)
end
end
Build and run:
$ mix deps.get && mix compile
$ iex -S mix
iex(1)> BrodDemo.bootstrap