brod
brod copied to clipboard
Create wiki with usage examples
- consumer in an application
- simple console consumer
- simple file consumer
- usage in elixir apps
That would be really helpful! I've incorporated brod as a producer into an Elixir app and I'm working on using it as in a downstream consumer now. I'd be happy to help write or review the examples.
@sdball I've created the Wiki, we will start adding stuff soon.
Here's a rough first pass at converting the demo group subscriber koc into Elixir. For the most part I simply converted Erlang to Elixir syntax. The biggest change is that I couldn't get the keyfind logic to work so I changed spawn_message_handlers to generate a map keyed by topic-partition.
defmodule GroupSubscriberDemo do
@behaviour :brod_group_subscriber
@produce_delay_seconds 5
require Logger
require Record
import Record, only: [defrecord: 2, extract: 2]
defrecord :kafka_message, extract(:kafka_message, from_lib: "brod/include/brod.hrl")
def bootstrap do
bootstrap(@produce_delay_seconds)
end
def bootstrap(delay_seconds) do
kafka_hosts = [kafka: 9092]
topic = "brod-demo-group-subscriber-koc"
{:ok, _} = Application.ensure_all_started(:brod)
group_id = "brod-demo-group-subscriber-koc-consumer-group"
topic_set = [topic]
member_clients = [:"brod-demo-group-subscriber-koc-client-1", :"brod-demo-group-subscriber-koc-client-2"]
:ok = bootstrap_subscribers(member_clients, kafka_hosts, group_id, topic_set)
producer_client_id = :brod_demo_group_subscriber_koc
:ok = :brod.start_client(kafka_hosts, producer_client_id, _client_config=[])
:ok = :brod.start_producer(producer_client_id, topic, _producer_config=[])
{:ok, partition_count} = :brod.get_partitions_count(producer_client_id, topic)
:ok = spawn_producers(producer_client_id, topic, delay_seconds, partition_count)
:ok
end
def init(_group_id, _callback_init_args = {client_id, topics}) do
handlers = spawn_message_handlers(client_id, topics)
{:ok, %{handlers: handlers}}
end
def handle_message(topic, partition, message, %{handlers: handlers} = state) do
pid = handlers["#{topic}-#{partition}"]
send pid, message
{:ok, state}
end
def bootstrap_subscribers([], _kafka_hosts, _group_id, _topics), do: :ok
def bootstrap_subscribers([client_id | rest], kafka_hosts, group_id, topics) do
:ok = :brod.start_client(kafka_hosts, client_id, _client_config=[])
group_config = [offset_commit_policy: :commit_to_kafka_v2, offset_commit_interval_seconds: 5]
{:ok, _subscriber} = :brod.start_link_group_subscriber(
client_id, group_id, topics, group_config,
_consumer_config = [begin_offset: :earliest],
_callback_module = __MODULE__,
_callback_init_args = {client_id, topics})
bootstrap_subscribers(rest, kafka_hosts, group_id, topics)
end
def spawn_producers(client_id, topic, delay_seconds, partition) when is_integer(partition) do
partitions = :lists.seq(0, partition-1)
spawn_producers(client_id, topic, delay_seconds, partitions)
end
def spawn_producers(client_id, topic, delay_seconds, [partition | partitions]) do
spawn_link(fn -> producer_loop(client_id, topic, partition, delay_seconds, 0) end)
spawn_producers(client_id, topic, delay_seconds, partitions)
end
def spawn_producers(_client_id, _topic, _delay_seconds, []), do: :ok
def producer_loop(client_id, topic, partition, delay_seconds, seq_no) do
kafka_value = "#{seq_no}"
:ok = :brod.produce_sync(client_id, topic, partition, _key="", kafka_value)
:timer.sleep(:timer.seconds(delay_seconds))
producer_loop(client_id, topic, partition, delay_seconds, seq_no + 1)
end
def spawn_message_handlers(_client_id, []), do: %{}
def spawn_message_handlers(client_id, [topic | rest]) do
{:ok, partition_count} = :brod.get_partitions_count(client_id, topic)
handlers = Enum.reduce :lists.seq(0, partition_count-1), %{}, fn partition, acc ->
handler_pid = spawn_link(__MODULE__, :message_handler_loop, [topic, partition, self])
Map.put(acc, "#{topic}-#{partition}", handler_pid)
end
Map.merge(handlers, spawn_message_handlers(client_id, rest))
end
def message_handler_loop(topic, partition, subscriber_pid) do
receive do
msg ->
%{offset: offset, value: value} = Enum.into(kafka_message(msg), %{})
Logger.info("#{inspect self} #{topic}-#{partition} Offset: #{offset}, Value: #{value}")
:brod_group_subscriber.ack(subscriber_pid, topic, partition, offset)
message_handler_loop(topic, partition, subscriber_pid)
after
1000 ->
message_handler_loop(topic, partition, subscriber_pid)
end
end
end
Thanks a lot @sdball ! I'll put this into the wiki later today.
@sdball https://github.com/klarna/brod/wiki/Brod-demo-in-Elixir
I updated code a bit.
Thanks for the usage example -- being new to both Elixir and Erlang, it helps a lot.
If I understand correctly, the handle_message callback is called by Brod for every message, (regardless of topic and partition). If so, doesn't the group subscriber process become a bottleneck? I understand that the callback is handing off the actual message processing very quickly; still, if there are multiple Erlang nodes, how can I scale out properly (assuming a single topic and multiple partitions)?
@kevinbader I have to admit that we have started with a too simple implementation which ended up not suitable for many common use cases.
At current version, you can implement the group member behaviour to have the assignments (topic-partitions) distributed to different processes. Here is an example: https://github.com/klarna/brucke/blob/master/src/brucke_member.erl
We are kind of stuck with this implementation at current version.
maybe to add a brod_group_member2.erl in next release and spawn as many brod_group_subscriber2 process as needed upon receiving the assignments.
It may however make the callback state management more complex because
then there will be many different callback states.
Sorry, didn't really answer the 'how to scale out' question.
Subscribers from different Erlang nodes can all subscribe to the same topic. They should get assigned with different partitions exclusively (evenly distributed).
There is no restriction on who should subscribe which topic. Subscribers can join group with any topic name they wish to receive data from.
@zmstone Thank you very much for your quick reply! By reading the brucke source code I hopefully get a better understanding about how everything works together.
Regarding scaling out, I understand that it'd be possible for every node to spawn a subscriber that listens to a specific topic-partition (brod_consumer?). Is the group subscriber feature meant to handle or assist with failover in case one of the nodes goes down, or is that something that has to be implemented separately? I assumed that the repartitioning between the members of a consumer group could handle that case, but for that to work there has to be some kind of communication between the Brod instances of the nodes, right?
I'm sorry if the question makes little sense... I guess it shows that I've yet to study OTP :)
brod_consumer is the per-topic-partition poller, it fetches message sets from kafka and send the message set directly to the subscriber process. If your use case allows static load distribution, and does not require offsets to be committed to kafka, you can chose to spawn workers in your application and make them subscribe to brod_consumer directly.
Consumer group provides you three main features:
- Consumed (acked) offsets are persisted in kafka
- Load distribution
- Less disruption for consumer (rolling) upgrade (because when a member leaves, its assignments will be re-distributed to other members)
The consumer group protocol is a part of KAFKA protocol spec which is rather complicated to explain in a short reply.
But in summary: there is no direct communication between different brod instances
All member (brod_group_coordinator) processes talk to a elected KAFKA broker to share state.
One of the brod_group_coordinator will be elected by KAFKA as a leader,
Then the leader should distribute the topic-partitions to different members.
brod_group_subscriber is a process that subscribes to ALL the assigned topic partitions to receive message sets from brod_subscriber processes, and it may hand off individual messages to different worker processes, but the overhead of passing messages (not message sets) around can be a bottleneck (This is the main reason we wanted a brod_group_subscriber2)
In brucke, we implemented a consumer group member which does not subscribe to any brod_consumer process. Instead, it spawns one subscriber for each assigned topic-partition. Then the subscribers themselves subscribe to brod_consumer and get message set (not individual messages) delivered to their mail box.