cafex
cafex copied to clipboard
Cafex is a pure Elixir implementation of Kafka client
Cafex
Cafex is a pure Elixir implementation of Kafka client with ZooKeeper and Consul intergration.
Cafex support Kafka 0.8 and 0.9 group membership APIs.
Cafex provides all kafka APIs encapsulation, producer implementation and high-level consumer implementation.
Producer
Example
iex> Application.start :cafex
iex> topic_name = "test_topic"
iex> brokers = [{"127.0.0.1", 9092}]
iex> {:ok, producer} = Cafex.start_producer topic_name, client_id: "myproducer",
brokers: brokers,
partitioner: MyPartitioner,
acks: 1,
batch_num: 100,
linger_ms: 10
iex> Cafex.produce producer, "message", key: "key"
iex> Cafex.async_produce producer, "message", key: "key"
Producer options
partitioner
The partitioner for partitioning messages amongst sub-topics.
The default partitioner is Cafex.Partitioner.Random
.
client_id
The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.
Default cafex_producer
.
acks
The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent.
Default value is 1
.
batch_num
The number of messages to send in one batch when linger_ms
is not zero.
The producer will wait until either this number of messages are ready to send.
linger_ms
This setting is the same as linger.ms
config in the new official producer configs.
This setting defaults to 0 (i.e. no delay).
NOTE: If
linger_ms
is set to0
, thebatch_num
will not take effect.
Consumer
Example
defmodule MyConsumer do
use Cafex.Consumer
def consume(msg, state) do
# handle the msg
{:ok, state}
end
end
iex> Application.start :cafex
iex> topic_name = "test_topic"
iex> brokers = [{"127.0.0.1", 9092}]
iex> options = [client_id: "myconsumer",
topic: topic_name,
brokers: brokers,
offset_storage: :kafka,
group_manager: :kafka,
lock: :consul,
group_session_timeout: 7000, # ms
auto_commit: true,
auto_commit_interval: 500, # ms
auto_commit_max_buffers: 50,
fetch_wait_time: 100, # ms
fetch_min_bytes: 32 * 1024,
fetch_max_bytes: 64 * 1024,
handler: {MyConsumer, []}]
iex> {:ok, consumer} = Cafex.start_consumer :myconsumer, options
The options
argument of the function start_consumer
can be put in the
config/config.exs
:
config :cafex, :myconsumer,
client_id: "cafex",
topic: "test_topic",
brokers: [
{"192.168.99.100", 9092},
{"192.168.99.101", 9092}
],
offset_storage: :kafka,
group_manager: :kafka,
lock: :consul,
group_session_timeout: 7000, # ms
auto_commit: true,
auto_commit_interval: 500, # ms
auto_commit_max_buffers: 50,
fetch_wait_time: 100, # ms
fetch_min_bytes: 32 * 1024,
fetch_max_bytes: 64 * 1024,
handler: {MyConsumer, []}
By default, cafex will use :kafka
as the offset storage, use the new kafka
group membership API, which was added in the 0.9.x, as the group manager,
and use the :consul
as the worker lock. Make suer your Kafka server is 0.9.x
or above.
But :zookeeper
is another option for these. If you use zookeeper, the starting
options of :erlzk
must be specified under the :zookeeper
key:
config :cafex, :myconsumer,
client_id: "cafex",
topic: "test_topic",
brokers: [...],
offset_storage: :zookeeper,
group_manager: :zookeeper,
lock: :zookeeper,
zookeeper: [
timeout: 5000,
servers: [{"192.168.99.100", 2181}],
chroot: "/cafex"
],
...
TODO
- Support kafka 0.10.x.x
- Add tests