Add support for RecordIterator
I messed up with the repo so I decided to recreate this pull request from scratch. Basically adding an Iterator support to browse the records using the RecordBatchDecoder
Hi, sorry I never got around to looking at this. I think iterators are a better way to expose records and I support this change. 👍
If you are still interested in landing it we will need to:
- fix merge conflicts
- fix the comment
- Add some basic tests.
And then I can take another look.
if you give me a couple of days we could squeeze this in 0.16.0
Cargo clippy runs fine on my machine, this should be as ready to merge as it could be.
Thanks 0.16.0 is already released, but will find time to review this soon.
@rukai would you mind merging an generating a new version? I would like to move our code to the official crate instead of my fork. Plus this code is incredibly churn-y, this is my third attempt to resolve conflicts. Thank you
Ping
Ping. We might very likely release our broker soon, it would be awesome if I can put this in. ;)
Your change to the version logic is correct, the important thing here is that the original implementation was broken: It reads the version from the batch:
https://github.com/tychedelia/kafka-protocol-rs/blob/3ca47de64d6ec65aba19d689b3547e7efa2439ea/src/records.rs#L465
Then reads the version from the batch again and compares them:
https://github.com/tychedelia/kafka-protocol-rs/blob/3ca47de64d6ec65aba19d689b3547e7efa2439ea/src/records.rs#L512-L515
This is very silly, since the bail condition is unreachable.
So I appreciate the fix.
Things remaining:
- CI is failing
- I believe kafka will return multiple batches as per the
decode_allimplementation. The decision to add decode_all was originally made here. I need to investigate deeper where this behavior comes from, either find it in the spec, or reproduce the behavior in an integration test.
The decision to add decode_all was originally made here.
To be clear, that was simply maintaining the then-current behavior which ran decode_batch in a loop while there were bytes
Yeah if I can add a data point here. The official java version of the client completely ignores the remaining bytes when the response is expected to have a single recordbatch. I can try to see what happens if the client expects more (multiple partitions) but I would expect that these are ignored as well I am fine with all options but I have a preference with being consistent with official implementation. Not related: my broker based on this crate is here https://GitHub.com/Cleafy/blink
I will fix the broken test
when the response is expected to have a single recordbatch.
Hmm what does it mean for it to be expected that a response contains a single record batch? I didn't think clients have any expectations about record batches per se. IIUC batches are returned as written to the disk, so if there are multiple batches on disk that would fit within max_bytes, the broker would return as many as it can in a Fetch response.
In one of my tests I remember packing by accident multiple batches instead of multiple records in the batch and realizing that the trailing one were discarded. As per expecting: if the client requests multiple partitions and the first recordbatch has bytes it will cause misalignment. I am recalling from memory so not 100%
On Thu, Oct 2, 2025 at 11:11 PM Mike Beaumont @.***> wrote:
michaelbeaumont left a comment (tychedelia/kafka-protocol-rs#106) https://github.com/tychedelia/kafka-protocol-rs/pull/106#issuecomment-3363068101
when the response is expected to have a single recordbatch.
Hmm what does it mean for it to be expected that a response contains a single record batch? I didn't think clients have any expectations about record batches per se. IIUC batches are returned as written to the disk, so if there are multiple batches on disk that would fit within max_bytes, the broker would return as many as it can in a Fetch response.
— Reply to this email directly, view it on GitHub https://github.com/tychedelia/kafka-protocol-rs/pull/106#issuecomment-3363068101, or unsubscribe https://github.com/notifications/unsubscribe-auth/AKAZJFMLBSTR6WY6QVJ3OAT3VWIHNAVCNFSM6AAAAABWO63PA2VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZTGNRTGA3DQMJQGE . You are receiving this because you authored the thread.Message ID: @.***>
@aovestdipaperino While produce requests can only contain a single RecordBatch per partition, see here, fetch requests can definitely contain multiple batches per partition, and shouldn't be discarded (you can see for example how the java client has a list of batches and iterates over records one batch at a time)
@aovestdipaperino WDYT can the existing behavior to read bytes until EOF be included?