kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

Way to get batch messages and commit if the batch is successful

Open sanchitlohia-ovo opened this issue 5 years ago • 18 comments

There is no way to implement the below flow

  1. Read batch messages from Kafka.
  2. Process all the messages.
  3. If all messages processing succesful - Commit the offset
  4. If any of the message processing fails - Do not commit the offset and read the same batch again and try to process the messages and if successful commit the offset.

sanchitlohia-ovo avatar Oct 12 '18 09:10 sanchitlohia-ovo

There’s already a way to do this via the FetchMessage/CommitMessages methods of the reader, the program can read messages one at a time and aggregate them to them in a batch which can then be committed.

Would this fit your use case? If not, I’d be interested to hear more details about it.

achille-roussel avatar Oct 12 '18 15:10 achille-roussel

@achille-roussel Thanks for the response. The way you suggested is correct but there is no way to read the same batch again if I am using specifying the GroupID in my reader. In my use case I have to specify the GroupID and I cannot use the reader.SetOffset(offSet) to reread the same batch again. Please suggest if there is any way.

sanchitlohia-ovo avatar Oct 12 '18 15:10 sanchitlohia-ovo

If you need access to the batch received by a single fetch request we'd need a new API indeed.

This isn't a use case that came up at Segment so it's unlikely that we will find time to dedicate to building this feature, but if you are available to work on it we would gladly review and merge the change.

achille-roussel avatar Nov 25 '18 06:11 achille-roussel

I have faced an issue with using batch reading using reader and FetchMessage/CommitMessages methods, i have made simple tests (example here - https://play.golang.org/p/aeYfRBfWIlQ), and i have cases when i started new consumer, rebalancing happened and the new consumer can start consuming the uncommitted but already fetched messages and i got doubling of the data

AntonKuzValid avatar Jul 08 '19 14:07 AntonKuzValid

We have the same request. For example, we consume messages to load into ElasticSearch, which can index many more documents if you provide it a batch at a time, instead of one-by-one. When using the Reader API, there is no way to access the batch of messages that Kafka returned, which would be the easiest and most efficient thing. If we use our own batch size, then we have to use an arbitrary timeout and introduce unnecessary delays whenever there is not a full batch of messages waiting on the topic.

I would be happy to contribute an implementation if anyone has guidance on what the API should be and the things that need to change.

robfig avatar Oct 15 '20 13:10 robfig

Looking at the current Reader implementation, it uses Batches under the covers, sending the messages in each batch one at a time over a channel. I have two alternate proposals for enabling reading a batch at a time:

  1. Add a "batch mode" bit to ReaderConfig
type ReaderConfig struct {
  // Batched causes the Reader to return a Batch at a time instead of a Message at a time.
  // If this is set, callers must use FetchBatch/ReadBatch instead of their Message equivalents.
  Batched bool
}

func (*Reader) FetchBatch(context.Context) (*Batch, error)
func (*Reader) ReadBatch(context.Context) (*Batch, error)
  1. Add a new Batched Reader
// NewBatchedReader returns a reader which returns each batch read to callers
func NewBatchedReader(cfg ReaderConfig) (*BatchedReader)

// BatchedReader has a similar interface to Reader, except with FetchBatch/ReadBatch instead of *Message

Thoughts?

robfig avatar Oct 20 '20 16:10 robfig

@achille-roussel wondering if you have any thoughts about the API?

robfig avatar Oct 26 '20 19:10 robfig

Tagging @alecthomas and @dominicbarnes as two recently-active maintainers to provide some feedback on the API design.

robfig avatar May 09 '21 20:05 robfig

Hey @robfig! Thanks for following up on this!

Have you looked at potentially using the kafka.ConsumerGroup API that the package exposes? Paired with kafka.Conn or kafka.Client to fetch batches of messages, you should be able to gain greater control over the timing of commits.

Let us know if this is helpful. If not, let's explore adding new APIs to kafka.Reader to expose batches instead of a individual messages 👍

achille-roussel avatar May 28 '21 17:05 achille-roussel

Hi @achille-roussel , The ConsumerGroup API does not really do what I wanted to. It does not expose the batch of messages, only one message at a time. You can get a batch of messages from a Conn, but that is specific to a partition and does not use consumer groups. We use this library quite a bit internally, and we would have many places we would use this API. We use Consumer Groups everywhere (which I think is typical practice) and many message types benefit from batched processing, but that is done unnecessarily inefficiently right now. In my benchmarks, I found that Conn.ReadBatch was significantly faster than reading from a ConsumerGroup, so I think there is a lot to gain. (This was a while ago.. have to look for the results..) Thanks for considering, Rob

robfig avatar May 29 '21 20:05 robfig

I have the benchmark results, but not the code. I'm not sure I really compared a ConsumerGroup vs a single-partition Reader, though. Anyway, I found that using Conn.ReadBatch, I could read 424MB of messages in 4.5s. Using Reader.FetchMessage, the same number took 41s. At that time I was looking into the speed of reading a single partition, but I think that I see a result seeing that I could read about 4GB of the topic with a ConsumerGroup in 30s, which would be a comparable performance. However, that reader would have been keeping simple stats rather than doing anything significant or batching the messages.

robfig avatar May 29 '21 20:05 robfig

Following back on this @robfig

Thanks for all the details you've been sharing! kafka.Reader uses kafka.Conn.ReadBatch underneath, so I'm thinking the delta in performance either comes from a difference in configuration, or the kafka.Reader abstractions being a bottleneck.

I know it's been a while, but if you had CPU or block profiles to share, or a benchmark setup I could reproduce, this would be highly valuable to understand how to improve kafka-go.

achille-roussel avatar Nov 12 '21 18:11 achille-roussel

Hello @robfig

I just wanted to cycle back here and check if you had any updates to share on this issue?

achille-roussel avatar Feb 15 '22 18:02 achille-roussel

I don't have a benchmark setup or any code to share, sorry. The code was the trivial code you might expect -- a loop repeatedly calling FetchMessage or ReadBatch to read all the data in a partition. The Kafka connection code would not be helpful and I'm not sure how I could even share the Kafka partition state. I'm no longer working on this project so I won't be able to help further here, sorry.

robfig avatar Feb 15 '22 18:02 robfig

Batch mode - It's definitely must be done...

My thoughts: active + prefetched batch

When you done processing active batch:

  1. synchronously switch to prefetched batch
  2. asynchronously submit offset of the last message of the finished batch + prefetch next batch

In such way you have

  1. "at least once" guarantee - If crash happens you reprocess one batch only...
  2. Fetch and offset commit become async - no latency
  3. You sending offsets rarely

vorandrew avatar Mar 06 '22 22:03 vorandrew

Thanks for the suggestions @santacruz123

I agree it would be a useful improvement for the kafka.Reader type to support reading and committing batches.

Would you be able to submit one or more PRs to implement the suggestions you made?

achille-roussel avatar Mar 18 '22 17:03 achille-roussel

Sorry guys, couldn't find it in the manual and it's not clear from the discussion. Is it already implemented? i.e. can I get batch messages from the kafka and commit? If yes I would be thankful if someone will point me to the example/tutorial of how to use it. Thank you in advance!

anatoly314 avatar Dec 29 '23 11:12 anatoly314

And another question, you mentioned Conn.ReadBatch is it meant to be used by library consumers or only for internal use? I have feeling that right now it's only for internal use, am I right?

anatoly314 avatar Dec 29 '23 12:12 anatoly314