brod icon indicating copy to clipboard operation
brod copied to clipboard

Create wiki with usage examples

Open id opened this issue 9 years ago • 10 comments

  • consumer in an application
  • simple console consumer
  • simple file consumer
  • usage in elixir apps

id avatar Oct 10 '16 16:10 id

That would be really helpful! I've incorporated brod as a producer into an Elixir app and I'm working on using it as in a downstream consumer now. I'd be happy to help write or review the examples.

sdball avatar Oct 14 '16 14:10 sdball

@sdball I've created the Wiki, we will start adding stuff soon.

id avatar Oct 14 '16 14:10 id

Here's a rough first pass at converting the demo group subscriber koc into Elixir. For the most part I simply converted Erlang to Elixir syntax. The biggest change is that I couldn't get the keyfind logic to work so I changed spawn_message_handlers to generate a map keyed by topic-partition.

defmodule GroupSubscriberDemo do
  @behaviour :brod_group_subscriber
  @produce_delay_seconds 5

  require Logger
  require Record
  import Record, only: [defrecord: 2, extract: 2]
  defrecord :kafka_message, extract(:kafka_message, from_lib: "brod/include/brod.hrl")

  def bootstrap do
    bootstrap(@produce_delay_seconds)
  end

  def bootstrap(delay_seconds) do
    kafka_hosts = [kafka: 9092]
    topic = "brod-demo-group-subscriber-koc"
    {:ok, _} = Application.ensure_all_started(:brod)
    group_id = "brod-demo-group-subscriber-koc-consumer-group"
    topic_set = [topic]
    member_clients = [:"brod-demo-group-subscriber-koc-client-1", :"brod-demo-group-subscriber-koc-client-2"]
    :ok = bootstrap_subscribers(member_clients, kafka_hosts, group_id, topic_set)
    producer_client_id = :brod_demo_group_subscriber_koc
    :ok = :brod.start_client(kafka_hosts, producer_client_id, _client_config=[])
    :ok = :brod.start_producer(producer_client_id, topic, _producer_config=[])
    {:ok, partition_count} = :brod.get_partitions_count(producer_client_id, topic)
    :ok = spawn_producers(producer_client_id, topic, delay_seconds, partition_count)
    :ok
  end

  def init(_group_id, _callback_init_args = {client_id, topics}) do
    handlers = spawn_message_handlers(client_id, topics)
    {:ok, %{handlers: handlers}}
  end

  def handle_message(topic, partition, message, %{handlers: handlers} = state) do
    pid = handlers["#{topic}-#{partition}"]
    send pid, message
    {:ok, state}
  end

  def bootstrap_subscribers([], _kafka_hosts, _group_id, _topics), do: :ok
  def bootstrap_subscribers([client_id | rest], kafka_hosts, group_id, topics) do
    :ok = :brod.start_client(kafka_hosts, client_id, _client_config=[])
    group_config = [offset_commit_policy: :commit_to_kafka_v2, offset_commit_interval_seconds: 5]
    {:ok, _subscriber} = :brod.start_link_group_subscriber(
      client_id, group_id, topics, group_config,
      _consumer_config = [begin_offset: :earliest],
      _callback_module = __MODULE__,
      _callback_init_args = {client_id, topics})
    bootstrap_subscribers(rest, kafka_hosts, group_id, topics)
  end

  def spawn_producers(client_id, topic, delay_seconds, partition) when is_integer(partition) do
    partitions = :lists.seq(0, partition-1)
    spawn_producers(client_id, topic, delay_seconds, partitions)
  end
  def spawn_producers(client_id, topic, delay_seconds, [partition | partitions]) do
    spawn_link(fn -> producer_loop(client_id, topic, partition, delay_seconds, 0) end)
    spawn_producers(client_id, topic, delay_seconds, partitions)
  end
  def spawn_producers(_client_id, _topic, _delay_seconds, []), do: :ok

  def producer_loop(client_id, topic, partition, delay_seconds, seq_no) do
    kafka_value = "#{seq_no}"
    :ok = :brod.produce_sync(client_id, topic, partition, _key="", kafka_value)
    :timer.sleep(:timer.seconds(delay_seconds))
    producer_loop(client_id, topic, partition, delay_seconds, seq_no + 1)
  end

  def spawn_message_handlers(_client_id, []), do: %{}
  def spawn_message_handlers(client_id, [topic | rest]) do
    {:ok, partition_count} = :brod.get_partitions_count(client_id, topic)
    handlers = Enum.reduce :lists.seq(0, partition_count-1), %{}, fn partition, acc ->
      handler_pid = spawn_link(__MODULE__, :message_handler_loop, [topic, partition, self])
      Map.put(acc, "#{topic}-#{partition}", handler_pid)
    end
    Map.merge(handlers, spawn_message_handlers(client_id, rest))
  end

  def message_handler_loop(topic, partition, subscriber_pid) do
    receive do
      msg ->
        %{offset: offset, value: value} = Enum.into(kafka_message(msg), %{})
        Logger.info("#{inspect self} #{topic}-#{partition} Offset: #{offset}, Value: #{value}")
        :brod_group_subscriber.ack(subscriber_pid, topic, partition, offset)
        message_handler_loop(topic, partition, subscriber_pid)
    after
      1000 ->
        message_handler_loop(topic, partition, subscriber_pid)
    end
  end
end

sdball avatar Oct 18 '16 14:10 sdball

Thanks a lot @sdball ! I'll put this into the wiki later today.

id avatar Oct 18 '16 15:10 id

@sdball https://github.com/klarna/brod/wiki/Brod-demo-in-Elixir

I updated code a bit.

id avatar Oct 18 '16 20:10 id

Thanks for the usage example -- being new to both Elixir and Erlang, it helps a lot.

If I understand correctly, the handle_message callback is called by Brod for every message, (regardless of topic and partition). If so, doesn't the group subscriber process become a bottleneck? I understand that the callback is handing off the actual message processing very quickly; still, if there are multiple Erlang nodes, how can I scale out properly (assuming a single topic and multiple partitions)?

kevinbader avatar Mar 03 '17 13:03 kevinbader

@kevinbader I have to admit that we have started with a too simple implementation which ended up not suitable for many common use cases.

At current version, you can implement the group member behaviour to have the assignments (topic-partitions) distributed to different processes. Here is an example: https://github.com/klarna/brucke/blob/master/src/brucke_member.erl

We are kind of stuck with this implementation at current version. maybe to add a brod_group_member2.erl in next release and spawn as many brod_group_subscriber2 process as needed upon receiving the assignments. It may however make the callback state management more complex because then there will be many different callback states.

zmstone avatar Mar 03 '17 14:03 zmstone

Sorry, didn't really answer the 'how to scale out' question.

Subscribers from different Erlang nodes can all subscribe to the same topic. They should get assigned with different partitions exclusively (evenly distributed).

There is no restriction on who should subscribe which topic. Subscribers can join group with any topic name they wish to receive data from.

zmstone avatar Mar 03 '17 14:03 zmstone

@zmstone Thank you very much for your quick reply! By reading the brucke source code I hopefully get a better understanding about how everything works together. Regarding scaling out, I understand that it'd be possible for every node to spawn a subscriber that listens to a specific topic-partition (brod_consumer?). Is the group subscriber feature meant to handle or assist with failover in case one of the nodes goes down, or is that something that has to be implemented separately? I assumed that the repartitioning between the members of a consumer group could handle that case, but for that to work there has to be some kind of communication between the Brod instances of the nodes, right? I'm sorry if the question makes little sense... I guess it shows that I've yet to study OTP :)

kevinbader avatar Mar 03 '17 19:03 kevinbader

brod_consumer is the per-topic-partition poller, it fetches message sets from kafka and send the message set directly to the subscriber process. If your use case allows static load distribution, and does not require offsets to be committed to kafka, you can chose to spawn workers in your application and make them subscribe to brod_consumer directly.

Consumer group provides you three main features:

  1. Consumed (acked) offsets are persisted in kafka
  2. Load distribution
  3. Less disruption for consumer (rolling) upgrade (because when a member leaves, its assignments will be re-distributed to other members)

The consumer group protocol is a part of KAFKA protocol spec which is rather complicated to explain in a short reply. But in summary: there is no direct communication between different brod instances All member (brod_group_coordinator) processes talk to a elected KAFKA broker to share state. One of the brod_group_coordinator will be elected by KAFKA as a leader, Then the leader should distribute the topic-partitions to different members.

brod_group_subscriber is a process that subscribes to ALL the assigned topic partitions to receive message sets from brod_subscriber processes, and it may hand off individual messages to different worker processes, but the overhead of passing messages (not message sets) around can be a bottleneck (This is the main reason we wanted a brod_group_subscriber2)

In brucke, we implemented a consumer group member which does not subscribe to any brod_consumer process. Instead, it spawns one subscriber for each assigned topic-partition. Then the subscribers themselves subscribe to brod_consumer and get message set (not individual messages) delivered to their mail box.

zmstone avatar Mar 03 '17 21:03 zmstone