Skip to content

Brod group subscriber demo in Elixir

Zaiming Shi edited this page Mar 17, 2017 · 5 revisions

Original page author: @sdball.

Assuming you have kafka running on localhost:9092, and you have topic "brod-demo-group-subscriber-koc" with at least 2 partitions (for demo purposes).

Create a new elixir project:

$ mix new brod_demo && cd brod_demo

Add brod in dependencies in mix.exs:

defp deps do
  [{:brod, "~> 2.3"}]
end

Add lib/brod_demo.ex:

defmodule BrodDemo do
  @kafka_hosts [localhost: 9092]
  @topic "brod-demo-group-subscriber-koc"
  @produce_delay_seconds 2
  ## bootstrap demo with partition producers producing sequence numbers
  ## every DEFAULT seconds.
  def bootstrap() do
    bootstrap(@produce_delay_seconds)
  end
  ## bootstrap demo subscrirber and a number of sequence number producers
  def bootstrap(delay_seconds) do
    {:ok, _} = Application.ensure_all_started(:brod)
    :ok = BrodDemoSubscriber.bootstrap(@kafka_hosts, @topic)
    :ok = BrodDemoProducers.bootstrap(@kafka_hosts, @topic, delay_seconds)
  end
end

## The brod_group_subscriber implementation.
defmodule BrodDemoSubscriber do
  @behaviour :brod_group_subscriber
  require Logger
  require Record
  import Record, only: [defrecord: 2, extract: 2]
  defrecord :kafka_message, extract(:kafka_message, from_lib: "brod/include/brod.hrl")

  ## API for demo
  def bootstrap(kafka_hosts, topic) do
    ## A group ID is to be shared between the members (which often run in
    ## different Erlang nodes or even hosts).
    group_id = "brod_demo_group_subscriber_koc-consumer-group"
    ## Different members may subscribe to identical or different set of topics.
    ## In the assignments, a member receives only the partitions from the
    ## subscribed topic set.
    topic_set = [topic]
    ## In this demo, we spawn two members in the same Erlang node.
    member_clients = [:brod_demo_group_subscriber_koc_client_1,
                      :brod_demo_group_subscriber_koc_client_2
                     ]
    :ok = bootstrap_subscribers(member_clients, kafka_hosts, group_id, topic_set)
  end

  ## brod_group_subscriber callback
  def init(_group_id, _callback_init_args = {client_id, topics}) do
    ## For demo, spawn one message handler per topic-partition.
    ## Depending on the use case:
    ## It might be enough to handle the message locally in the subscriber process
    ## without dispatching to handlers. i.e. retrun {:ok, :ac, callback_state} here.
    ## Or there could be a pool of handlers if the messages can be processed
    ## in arbitrary order.
    handlers = spawn_message_handlers(client_id, topics)
    {:ok, %{handlers: handlers}}
  end

  ## brod_group_subscriber callback
  def handle_message(topic, partition, message, %{handlers: handlers} = state) do
    pid = handlers["#{topic}-#{partition}"]
    ## send the message to message handler process for async processing
    send pid, message
    ## or return {:ok, :ack, state} in case message can be handled synchronously here
    {:ok, state}
  end

  ## API for internal use
  def message_handler_loop(topic, partition, subscriber_pid) do
    receive do
      msg ->
        %{offset: offset, value: value} = Enum.into(kafka_message(msg), %{})
        Logger.info("#{inspect self} #{topic}-#{partition} Offset: #{offset}, Value: #{value}")
        ## send the async ack to group subscriber
        ## the offset will be eventually committed to kafak
        :brod_group_subscriber.ack(subscriber_pid, topic, partition, offset)
        __MODULE__.message_handler_loop(topic, partition, subscriber_pid)
    after
      1000 ->
        __MODULE__.message_handler_loop(topic, partition, subscriber_pid)
    end
  end

  defp bootstrap_subscribers([], _kafka_hosts, _group_id, _topics), do: :ok
  defp bootstrap_subscribers([client_id | rest], kafka_hosts, group_id, topics) do
    :ok = :brod.start_client(kafka_hosts, client_id, _client_config=[])
    group_config = [offset_commit_policy: :commit_to_kafka_v2,
                    offset_commit_interval_seconds: 5,
                    rejoin_delay_seconds: 2
                   ]
    {:ok, _subscriber} =
      :brod.start_link_group_subscriber(client_id, group_id, topics, group_config,
                                        _consumer_config = [begin_offset: :earliest],
                                        _callback_module = __MODULE__,
                                        _callback_init_args = {client_id, topics})
    bootstrap_subscribers(rest, kafka_hosts, group_id, topics)
  end

  defp spawn_message_handlers(_client_id, []), do: %{}
  defp spawn_message_handlers(client_id, [topic | rest]) do
    {:ok, partition_count} = :brod.get_partitions_count(client_id, topic)
    handlers = Enum.reduce :lists.seq(0, partition_count-1), %{}, fn partition, acc ->
      handler_pid = spawn_link(__MODULE__, :message_handler_loop, [topic, partition, self])
      Map.put(acc, "#{topic}-#{partition}", handler_pid)
    end
    Map.merge(handlers, spawn_message_handlers(client_id, rest))
  end
end

## Producers for demo
defmodule BrodDemoProducers do
  ## spawn per-partition processes to feed kafka with sequence numbers for demo
  def bootstrap(kafka_hosts, topic, delay_seconds) do
    producer_client_id = :brod_demo_group_subscriber_koc_producer_client
    :ok = :brod.start_client(kafka_hosts, producer_client_id, _client_config=[])
    :ok = :brod.start_producer(producer_client_id, topic, _producer_config=[])
    {:ok, partition_count} = :brod.get_partitions_count(producer_client_id, topic)
    :ok = spawn_producers(producer_client_id, topic, delay_seconds, partition_count)
  end

  defp spawn_producers(client_id, topic, delay_seconds, partition) when is_integer(partition) do
    partitions = :lists.seq(0, partition-1)
    spawn_producers(client_id, topic, delay_seconds, partitions)
  end
  defp spawn_producers(client_id, topic, delay_seconds, [partition | partitions]) do
    spawn_link(fn -> producer_loop(client_id, topic, partition, delay_seconds, 0) end)
    spawn_producers(client_id, topic, delay_seconds, partitions)
  end
  defp spawn_producers(_client_id, _topic, _delay_seconds, []), do: :ok

  defp producer_loop(client_id, topic, partition, delay_seconds, seq_no) do
    kafka_value = "#{seq_no}"
    :ok = :brod.produce_sync(client_id, topic, partition, _key="", kafka_value)
    :timer.sleep(:timer.seconds(delay_seconds))
    producer_loop(client_id, topic, partition, delay_seconds, seq_no + 1)
  end
end

Build and run:

$ mix deps.get && mix compile
$ iex -S mix
iex(1)> BrodDemo.bootstrap
Clone this wiki locally