kafka-protocol-rs icon indicating copy to clipboard operation
kafka-protocol-rs copied to clipboard

Add support for RecordIterator

Open aovestdipaperino opened this issue 10 months ago • 14 comments

I messed up with the repo so I decided to recreate this pull request from scratch. Basically adding an Iterator support to browse the records using the RecordBatchDecoder

aovestdipaperino avatar Feb 04 '25 16:02 aovestdipaperino

Hi, sorry I never got around to looking at this. I think iterators are a better way to expose records and I support this change. 👍

If you are still interested in landing it we will need to:

  • fix merge conflicts
  • fix the comment
  • Add some basic tests.

And then I can take another look.

rukai avatar Aug 07 '25 02:08 rukai

if you give me a couple of days we could squeeze this in 0.16.0

aovestdipaperino avatar Sep 12 '25 19:09 aovestdipaperino

Cargo clippy runs fine on my machine, this should be as ready to merge as it could be.

aovestdipaperino avatar Sep 12 '25 20:09 aovestdipaperino

Thanks 0.16.0 is already released, but will find time to review this soon.

rukai avatar Sep 12 '25 22:09 rukai

@rukai would you mind merging an generating a new version? I would like to move our code to the official crate instead of my fork. Plus this code is incredibly churn-y, this is my third attempt to resolve conflicts. Thank you

aovestdipaperino avatar Sep 15 '25 13:09 aovestdipaperino

Ping

aovestdipaperino avatar Sep 17 '25 17:09 aovestdipaperino

Ping. We might very likely release our broker soon, it would be awesome if I can put this in. ;)

aovestdipaperino avatar Sep 27 '25 10:09 aovestdipaperino

Your change to the version logic is correct, the important thing here is that the original implementation was broken: It reads the version from the batch:

https://github.com/tychedelia/kafka-protocol-rs/blob/3ca47de64d6ec65aba19d689b3547e7efa2439ea/src/records.rs#L465

Then reads the version from the batch again and compares them:

https://github.com/tychedelia/kafka-protocol-rs/blob/3ca47de64d6ec65aba19d689b3547e7efa2439ea/src/records.rs#L512-L515

This is very silly, since the bail condition is unreachable.

So I appreciate the fix.

Things remaining:

  • CI is failing
  • I believe kafka will return multiple batches as per the decode_all implementation. The decision to add decode_all was originally made here. I need to investigate deeper where this behavior comes from, either find it in the spec, or reproduce the behavior in an integration test.

rukai avatar Sep 29 '25 23:09 rukai

The decision to add decode_all was originally made here.

To be clear, that was simply maintaining the then-current behavior which ran decode_batch in a loop while there were bytes

michaelbeaumont avatar Sep 30 '25 06:09 michaelbeaumont

Yeah if I can add a data point here. The official java version of the client completely ignores the remaining bytes when the response is expected to have a single recordbatch. I can try to see what happens if the client expects more (multiple partitions) but I would expect that these are ignored as well I am fine with all options but I have a preference with being consistent with official implementation. Not related: my broker based on this crate is here https://GitHub.com/Cleafy/blink

aovestdipaperino avatar Oct 02 '25 20:10 aovestdipaperino

I will fix the broken test

aovestdipaperino avatar Oct 02 '25 21:10 aovestdipaperino

when the response is expected to have a single recordbatch.

Hmm what does it mean for it to be expected that a response contains a single record batch? I didn't think clients have any expectations about record batches per se. IIUC batches are returned as written to the disk, so if there are multiple batches on disk that would fit within max_bytes, the broker would return as many as it can in a Fetch response.

michaelbeaumont avatar Oct 02 '25 21:10 michaelbeaumont

In one of my tests I remember packing by accident multiple batches instead of multiple records in the batch and realizing that the trailing one were discarded. As per expecting: if the client requests multiple partitions and the first recordbatch has bytes it will cause misalignment. I am recalling from memory so not 100%

On Thu, Oct 2, 2025 at 11:11 PM Mike Beaumont @.***> wrote:

michaelbeaumont left a comment (tychedelia/kafka-protocol-rs#106) https://github.com/tychedelia/kafka-protocol-rs/pull/106#issuecomment-3363068101

when the response is expected to have a single recordbatch.

Hmm what does it mean for it to be expected that a response contains a single record batch? I didn't think clients have any expectations about record batches per se. IIUC batches are returned as written to the disk, so if there are multiple batches on disk that would fit within max_bytes, the broker would return as many as it can in a Fetch response.

— Reply to this email directly, view it on GitHub https://github.com/tychedelia/kafka-protocol-rs/pull/106#issuecomment-3363068101, or unsubscribe https://github.com/notifications/unsubscribe-auth/AKAZJFMLBSTR6WY6QVJ3OAT3VWIHNAVCNFSM6AAAAABWO63PA2VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZTGNRTGA3DQMJQGE . You are receiving this because you authored the thread.Message ID: @.***>

aovestdipaperino avatar Oct 02 '25 21:10 aovestdipaperino

@aovestdipaperino While produce requests can only contain a single RecordBatch per partition, see here, fetch requests can definitely contain multiple batches per partition, and shouldn't be discarded (you can see for example how the java client has a list of batches and iterates over records one batch at a time)

tanguylebarzic avatar Nov 21 '25 14:11 tanguylebarzic

@aovestdipaperino WDYT can the existing behavior to read bytes until EOF be included?

michaelbeaumont avatar Dec 01 '25 19:12 michaelbeaumont