kafka-go
kafka-go copied to clipboard
Way to get batch messages and commit if the batch is successful
There is no way to implement the below flow
- Read batch messages from Kafka.
- Process all the messages.
- If all messages processing succesful - Commit the offset
- If any of the message processing fails - Do not commit the offset and read the same batch again and try to process the messages and if successful commit the offset.
There’s already a way to do this via the FetchMessage/CommitMessages methods of the reader, the program can read messages one at a time and aggregate them to them in a batch which can then be committed.
Would this fit your use case? If not, I’d be interested to hear more details about it.
@achille-roussel Thanks for the response. The way you suggested is correct but there is no way to read the same batch again if I am using specifying the GroupID in my reader. In my use case I have to specify the GroupID and I cannot use the reader.SetOffset(offSet) to reread the same batch again. Please suggest if there is any way.
If you need access to the batch received by a single fetch request we'd need a new API indeed.
This isn't a use case that came up at Segment so it's unlikely that we will find time to dedicate to building this feature, but if you are available to work on it we would gladly review and merge the change.
I have faced an issue with using batch reading using reader and FetchMessage/CommitMessages methods, i have made simple tests (example here - https://play.golang.org/p/aeYfRBfWIlQ), and i have cases when i started new consumer, rebalancing happened and the new consumer can start consuming the uncommitted but already fetched messages and i got doubling of the data
We have the same request. For example, we consume messages to load into ElasticSearch, which can index many more documents if you provide it a batch at a time, instead of one-by-one. When using the Reader API, there is no way to access the batch of messages that Kafka returned, which would be the easiest and most efficient thing. If we use our own batch size, then we have to use an arbitrary timeout and introduce unnecessary delays whenever there is not a full batch of messages waiting on the topic.
I would be happy to contribute an implementation if anyone has guidance on what the API should be and the things that need to change.
Looking at the current Reader implementation, it uses Batches under the covers, sending the messages in each batch one at a time over a channel. I have two alternate proposals for enabling reading a batch at a time:
- Add a "batch mode" bit to ReaderConfig
type ReaderConfig struct {
// Batched causes the Reader to return a Batch at a time instead of a Message at a time.
// If this is set, callers must use FetchBatch/ReadBatch instead of their Message equivalents.
Batched bool
}
func (*Reader) FetchBatch(context.Context) (*Batch, error)
func (*Reader) ReadBatch(context.Context) (*Batch, error)
- Add a new Batched Reader
// NewBatchedReader returns a reader which returns each batch read to callers
func NewBatchedReader(cfg ReaderConfig) (*BatchedReader)
// BatchedReader has a similar interface to Reader, except with FetchBatch/ReadBatch instead of *Message
Thoughts?
@achille-roussel wondering if you have any thoughts about the API?
Tagging @alecthomas and @dominicbarnes as two recently-active maintainers to provide some feedback on the API design.
Hey @robfig! Thanks for following up on this!
Have you looked at potentially using the kafka.ConsumerGroup
API that the package exposes? Paired with kafka.Conn
or kafka.Client
to fetch batches of messages, you should be able to gain greater control over the timing of commits.
Let us know if this is helpful. If not, let's explore adding new APIs to kafka.Reader
to expose batches instead of a individual messages 👍
Hi @achille-roussel , The ConsumerGroup API does not really do what I wanted to. It does not expose the batch of messages, only one message at a time. You can get a batch of messages from a Conn, but that is specific to a partition and does not use consumer groups. We use this library quite a bit internally, and we would have many places we would use this API. We use Consumer Groups everywhere (which I think is typical practice) and many message types benefit from batched processing, but that is done unnecessarily inefficiently right now. In my benchmarks, I found that Conn.ReadBatch was significantly faster than reading from a ConsumerGroup, so I think there is a lot to gain. (This was a while ago.. have to look for the results..) Thanks for considering, Rob
I have the benchmark results, but not the code. I'm not sure I really compared a ConsumerGroup vs a single-partition Reader, though. Anyway, I found that using Conn.ReadBatch, I could read 424MB of messages in 4.5s. Using Reader.FetchMessage, the same number took 41s. At that time I was looking into the speed of reading a single partition, but I think that I see a result seeing that I could read about 4GB of the topic with a ConsumerGroup in 30s, which would be a comparable performance. However, that reader would have been keeping simple stats rather than doing anything significant or batching the messages.
Following back on this @robfig
Thanks for all the details you've been sharing! kafka.Reader
uses kafka.Conn.ReadBatch
underneath, so I'm thinking the delta in performance either comes from a difference in configuration, or the kafka.Reader
abstractions being a bottleneck.
I know it's been a while, but if you had CPU or block profiles to share, or a benchmark setup I could reproduce, this would be highly valuable to understand how to improve kafka-go.
Hello @robfig
I just wanted to cycle back here and check if you had any updates to share on this issue?
I don't have a benchmark setup or any code to share, sorry. The code was the trivial code you might expect -- a loop repeatedly calling FetchMessage
or ReadBatch
to read all the data in a partition. The Kafka connection code would not be helpful and I'm not sure how I could even share the Kafka partition state. I'm no longer working on this project so I won't be able to help further here, sorry.
Batch mode - It's definitely must be done...
My thoughts: active + prefetched batch
When you done processing active batch:
- synchronously switch to prefetched batch
- asynchronously submit offset of the last message of the finished batch + prefetch next batch
In such way you have
- "at least once" guarantee - If crash happens you reprocess one batch only...
- Fetch and offset commit become async - no latency
- You sending offsets rarely
Thanks for the suggestions @santacruz123
I agree it would be a useful improvement for the kafka.Reader
type to support reading and committing batches.
Would you be able to submit one or more PRs to implement the suggestions you made?
Sorry guys, couldn't find it in the manual and it's not clear from the discussion. Is it already implemented? i.e. can I get batch messages from the kafka and commit? If yes I would be thankful if someone will point me to the example/tutorial of how to use it. Thank you in advance!
And another question, you mentioned Conn.ReadBatch is it meant to be used by library consumers or only for internal use? I have feeling that right now it's only for internal use, am I right?