kafka_ex icon indicating copy to clipboard operation
kafka_ex copied to clipboard

Use iolists when building packets

Open joshuawscott opened this issue 6 years ago • 9 comments

#290 / #231 Implemented using iolists / iodata when building produce protocol messages. This results in a significant speedup in those parts of the code, especially with larger messages/message sets.

We should use a similar strategy for the other protocol message modules.

joshuawscott avatar May 23 '18 14:05 joshuawscott

Hi @joshuawscott I would like to take this issue. Did you see the need to run some kind of bench during the changes or are you convinced that the benchmark you ran on referenced issues is enough to go through all modules under kafka_ex/protocol and changing to iodata?

habutre avatar Oct 24 '19 11:10 habutre

FWIW the new Kayrock-based client produces iolists for every message.

It may still be worth making these improvements, though, since it may take a while for the new client to become adopted.

I don’t think it’s necessary to do benchmarks. iodata is pretty well known to be more efficient than binaries for this kind of use case. If you want to break up the work, you should be able to do these one message at a time and each one will provide some benefit. I’d start with the most frequently used messages - fetch and offset commit come to mind.

dantswain avatar Oct 24 '19 12:10 dantswain

Hey @dantswain I was exactly thinking about breaking the work up into smaller pieces I mean file by file under protocol, so that I can get used with the code, each change and get also a quick feedback.

Thanks for that initial guidance.

habutre avatar Oct 24 '19 12:10 habutre

@habutre I don't see the need for benchmarking; iolists are almost certainly going to be faster.

I think I looked at this before and didn't find any binary concatenations, so I never did anything with it. Feel free to take another look, I definitely could have missed things

joshuawscott avatar Oct 24 '19 15:10 joshuawscott

To be honest @joshuawscott that was my first impression I didn't found any case at first look where binary concat could be found. I will focus on find any case otherwise close that issue would be the action to be taken.

habutre avatar Oct 24 '19 15:10 habutre

Hi guys, I finish the review of this issue and as @joshuawscott told no binary concat was found. Even though I have applied some changes and I really appreciate your review #380.

I got a bit stuck on topic creation request and I would appreciate if someone help me to understand.

The function KafkaEx.Protocol.CreateTopics.encode_replica_assignment/1 does this concatenation and seems not working -> (<<replica_assignment.partition::32-signed>> <> replica_assignment.replicas) since replica_assignent.replicas is a list. Topics management are being supported/used currently? If yes should the list be encoded as <<list_size, replica1_id, ..., replicaN_id>> I didn't find in a quick look the implementation guidance.

habutre avatar Oct 29 '19 20:10 habutre

@dantswain you've been looking at the protocol stuff more, do you know the answer to this?

joshuawscott avatar Nov 10 '19 17:11 joshuawscott

@habutre Sorry it took me so long to get to this. The replica assignment encoding looks correct to me, but written in a way that's a little hard to follow. The create topic v0 schema is (https://kafka.apache.org/protocol#The_Messages_CreateTopics):

CreateTopics Request (Version: 0) => [topics] timeout_ms 
  topics => name num_partitions replication_factor [assignments] [configs] 
    name => STRING
    num_partitions => INT32
    replication_factor => INT16
    assignments => partition_index [broker_ids] 
      partition_index => INT32
      broker_ids => INT32
    configs => name value 
      name => STRING
      value => NULLABLE_STRING
  timeout_ms => INT32

The encode_replica_assignments/1 function looks like this:

  defp encode_replica_assignments(replica_assignments) do
    replica_assignments |> map_encode(&encode_replica_assignment/1)
  end

  defp encode_replica_assignment(replica_assignment) do
    (<<replica_assignment.partition::32-signed>> <> replica_assignment.replicas)
    |> map_encode(&<<&1::32-signed>>)
  end

Which is using the KafkaEx.Protocol.Common.map_encode/2 function:

  def map_encode(elems, function) do
    if nil == elems or [] == elems do
      <<0::32-signed>>
    else
      <<length(elems)::32-signed>> <>
        (elems
         |> Enum.map(function)
         |> Enum.reduce(&(&1 <> &2)))
    end
  end

So assuming replica_assignments is a list, we should get a call to map_encode for that, which will encode the length of the list and then call encode_replica_assignment/1 for each of the elements, which in turn encodes the partition id followed by map_encode called on the list of assigned broker ids. The map_encode function definitely makes it sort of hard to follow.

The CreateTopics function definitely does some binary concatenation. The map_encode function also does, although that looks to only be used in a couple places. PRs to clean those up are welcome, though thankfully they're not functions that should get called super frequently.

dantswain avatar Nov 24 '19 16:11 dantswain

Hi @dantswain sorry now for my long delay, following the docs to provide the necessary structs to create a topic worked as expected. Thank you for your patience.

Do you believe this issue can be closed since there isn't binary concats anymore?

PS: during my tests I figure out that when you are going to create a topic one should choose between provide a ReplicaAssignment or a partition and the replication factor. It worth to document somewhere? It could be another small low priority issue

habutre avatar Jan 13 '20 21:01 habutre

@joshuawscott I think we can close this issue

Argonus avatar May 09 '23 15:05 Argonus

Maybe next time we could mention closes #ISSUE_ID in the PR description to close issues after merge

habutre avatar Jun 08 '23 02:06 habutre