kafka_ex
kafka_ex copied to clipboard
Use iolists when building packets
#290 / #231 Implemented using iolists / iodata when building produce protocol messages. This results in a significant speedup in those parts of the code, especially with larger messages/message sets.
We should use a similar strategy for the other protocol message modules.
Hi @joshuawscott I would like to take this issue. Did you see the need to run some kind of bench during the changes or are you convinced that the benchmark you ran on referenced issues is enough to go through all modules under kafka_ex/protocol
and changing to iodata?
FWIW the new Kayrock-based client produces iolists for every message.
It may still be worth making these improvements, though, since it may take a while for the new client to become adopted.
I don’t think it’s necessary to do benchmarks. iodata is pretty well known to be more efficient than binaries for this kind of use case. If you want to break up the work, you should be able to do these one message at a time and each one will provide some benefit. I’d start with the most frequently used messages - fetch and offset commit come to mind.
Hey @dantswain I was exactly thinking about breaking the work up into smaller pieces I mean file by file under protocol, so that I can get used with the code, each change and get also a quick feedback.
Thanks for that initial guidance.
@habutre I don't see the need for benchmarking; iolists are almost certainly going to be faster.
I think I looked at this before and didn't find any binary concatenations, so I never did anything with it. Feel free to take another look, I definitely could have missed things
To be honest @joshuawscott that was my first impression I didn't found any case at first look where binary concat could be found. I will focus on find any case otherwise close that issue would be the action to be taken.
Hi guys, I finish the review of this issue and as @joshuawscott told no binary concat was found. Even though I have applied some changes and I really appreciate your review #380.
I got a bit stuck on topic creation request and I would appreciate if someone help me to understand.
The function KafkaEx.Protocol.CreateTopics.encode_replica_assignment/1
does this concatenation and seems not working ->
(<<replica_assignment.partition::32-signed>> <> replica_assignment.replicas)
since replica_assignent.replicas
is a list. Topics management are being supported/used currently? If yes should the list be encoded as <<list_size, replica1_id, ..., replicaN_id>>
I didn't find in a quick look the implementation guidance.
@dantswain you've been looking at the protocol stuff more, do you know the answer to this?
@habutre Sorry it took me so long to get to this. The replica assignment encoding looks correct to me, but written in a way that's a little hard to follow. The create topic v0 schema is (https://kafka.apache.org/protocol#The_Messages_CreateTopics):
CreateTopics Request (Version: 0) => [topics] timeout_ms
topics => name num_partitions replication_factor [assignments] [configs]
name => STRING
num_partitions => INT32
replication_factor => INT16
assignments => partition_index [broker_ids]
partition_index => INT32
broker_ids => INT32
configs => name value
name => STRING
value => NULLABLE_STRING
timeout_ms => INT32
The encode_replica_assignments/1
function looks like this:
defp encode_replica_assignments(replica_assignments) do
replica_assignments |> map_encode(&encode_replica_assignment/1)
end
defp encode_replica_assignment(replica_assignment) do
(<<replica_assignment.partition::32-signed>> <> replica_assignment.replicas)
|> map_encode(&<<&1::32-signed>>)
end
Which is using the KafkaEx.Protocol.Common.map_encode/2
function:
def map_encode(elems, function) do
if nil == elems or [] == elems do
<<0::32-signed>>
else
<<length(elems)::32-signed>> <>
(elems
|> Enum.map(function)
|> Enum.reduce(&(&1 <> &2)))
end
end
So assuming replica_assignments
is a list, we should get a call to map_encode
for that, which will encode the length of the list and then call encode_replica_assignment/1
for each of the elements, which in turn encodes the partition id followed by map_encode
called on the list of assigned broker ids. The map_encode
function definitely makes it sort of hard to follow.
The CreateTopics function definitely does some binary concatenation. The map_encode function also does, although that looks to only be used in a couple places. PRs to clean those up are welcome, though thankfully they're not functions that should get called super frequently.
Hi @dantswain sorry now for my long delay, following the docs to provide the necessary structs to create a topic worked as expected. Thank you for your patience.
Do you believe this issue can be closed since there isn't binary concats anymore?
PS: during my tests I figure out that when you are going to create a topic one should choose between provide a ReplicaAssignment
or a partition
and the replication factor
. It worth to document somewhere? It could be another small low priority issue
@joshuawscott I think we can close this issue
Maybe next time we could mention closes #ISSUE_ID
in the PR description to close issues after merge