kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

Create a sane Writer + Batch System

Open arianitu opened this issue 2 years ago • 11 comments

Describe the solution you'd like

The Writer abstraction is a bad design when it comes to writing messages and batching.

Current System

Users call WriteMessages which takes N messages. The user must collect N messages themselves and then call WriteMessages which is basically creating a batching system on top of a batching system. What's worse is if a user calls WriteMessages in a for loop (the most common way of writing messages from streams, files, etc.) it throttles you to about 1 / second by default.

There's lots of issues where users switch BatchTimeout to 10 * time.Millisecond which is just a hack to get the desired / intuitive behaviour.

New System

Users called WriteMessage which batches internally and sends them at either BatchSize or BatchTimeout. This API is intuitive and anyone can use it. Users who send messages in for loop (most common way of writing messages from streams, files, etc.) will get the expected behaviour.

The user no longer has to create a batching system on top of the existing batching system and just has to call WriteMessage with all their messages and it will handle the rest.

arianitu avatar Feb 17 '22 17:02 arianitu

Hi @arianitu , have you had a chance to try the Writer in "asynchronous mode"?

Docs: https://pkg.go.dev/github.com/segmentio/kafka-go#Writer

Code: https://github.com/segmentio/kafka-go/blob/b952e635d4f1b688854c1097cd7ce6eeaf50b273/writer.go#L54-L74

nlsun avatar Feb 25 '22 19:02 nlsun

I think the WriteMessages() implementation is problematic in several ways:

It can either be asynchronous or not, depending on the config. I think we should have dedicated async methods. Something like:

func (w *BufferedWriter) WriteMessagesAsync(ctx context.Context, messages ...kafka.Message) <-chan error {
  // ...
}

The current WriteMessages method can buffer, timeout or be async. This is too complicated. In some cases we want to write a messages immediately. We should have a method for that:

// WriteMessages write the messages. This method blocks until all the messages are sent.
func (w *BufferedWriter) WriteMessages(ctx context.Context, messages ...kafka.Message)  error {
  // ...
}

In cases where performances matter we could add a buffered API:

// WriteBuffered saves the messages in a buffer, flushing every BufferSize.
// If there's room in the buffer, no message are written. The returned error is the first
// error returned by the Flush method.
func (w *BufferedWriter) WriteBuffered(ctx context.Context, messages ...kafka.Message) error {
	// [...]
}

// Flush sends and then empties the content of the buffer. 
func (w *BufferedWriter) Flush(ctx context.Context) error {
  // [...]
}

Building up on the API above, we could add another methods in order to handle timeouts:

// WriteBufferedAndFlushAfter works as WriteBuffered but also calls a Flush with the same context after duration.
func (w *FlushingWriter) WriteBufferedAndFlushAfter(ctx context.Context, delay time.Duration, messages ...kafka.Message) chan error {
  // [...]
}

I have a working implementation with tests and could contribute if you agree @achille-roussel / @nlsun. @arianitu it would be nice to have your feedback as well.

hadrienk avatar Mar 02 '22 12:03 hadrienk

It may make sense to use an existing library such as: https://pkg.go.dev/github.com/eapache/[email protected]/batcher#section-readme to handle batching and have it call WriteMessages as a side-effect.

derekadams avatar Apr 27 '22 20:04 derekadams

I see another problem with the current implementation - batching is forced on the developer. There is no way for the developer to disable it and handle batching externally. If the developer tries to manually collect batches and decide when to flush them to Kafka, the writer will either break batches down into even smaller sizes or wait until the batch timeout is reached.

I would prefer the approach proposed by @derekadams - the function WriteMessages should just write whatever messages it gets directly to Kafka without any batching logic, batching can then be implemented as a wrapper around that function.

lovromazgon avatar Aug 23 '22 15:08 lovromazgon

Hi, thanks for all the ideas, we probably won't change the API of WriteMessages but going by the comments it could be worth looking into creating a new function that:

  • Writes synchronously
  • Does not do any re-batching under the hood.

This can act as a building block for those that want fine grained control over batching.

How does that sound? If someone is interested we'd be happy to review a pull request.

nlsun avatar Sep 02 '22 17:09 nlsun

Same here, @nlsun. We are already doing our batching, so I would like to send the messages directly to Kafka.

Weird thing about this library, it forces you to do batching and leaving all other functions private so nobody can make something useful on top the base functions.

Also if someone uses batching of this library, it means that they don't care about message safety. So maybe nobody uses this library for important message delivery but instead cheap messages like metrics, logs etc? Because what happens if application crashes during the batching? All important messages are gone..

I think I will test Sarama. I might also send a PR for my requirements.

Edit: Later on, I saw the Produce functions. I think people can skip the batching behaviour via this method, right? But probably I will strip the most of the library and just leave the produce methods. Our purpose is low latency and less resource usage on sending messages to kafka.

Edit 2: Apparently Sarama had really bad updates recently. There are a lot of data racing, panics etc in the open issues in the last 30 days. So I will focus on this library but refactor the base code for our needs.

yusufozturk avatar Sep 11 '22 23:09 yusufozturk

Is anybody refactor Writer + Batch System since February? Maybe I can refork from where you left. @arianitu Thanks.

yusufozturk avatar Sep 12 '22 11:09 yusufozturk

@nlsun Sent the PR.

yusufozturk avatar Sep 12 '22 21:09 yusufozturk

@yusufozturk thanks for the PR. Just to clarify since I ended up forgetting the original purpose of this issue:

The original ask was something along the lines of

Add a non-blocking WriteMessages that also guarantees messages are sent.

We ended up discussing a slightly different goal of

Add a way to bypass the batching in Writer.WriteMessages

I created a new issue to discuss bypassing the batching in Writer.WriteMessages here: https://github.com/segmentio/kafka-go/issues/994

nlsun avatar Sep 20 '22 21:09 nlsun

Any update to the issue ?

Our service use sarama to implement the producer originally, and it suffers from several problems inside the library so that we decide try this library instead.

Our application send messages one by one, and does not do any batching before call writer.WriterMessages methods, this looks would cause performance problem, we found that our throughput is reduced from 20k/s to 17k/s compared to the sarama producer.

From the comments above, I found that maybe we should batching messages before send them to the writer, will this improve the throughput performance ?

3AceShowHand avatar Feb 24 '23 13:02 3AceShowHand

@3AceShowHand we haven't had anyone volunteer to implement this feature yet.

For your immediate performance issues, yes I believe batching will improve your throughput.

nlsun avatar Feb 24 '23 18:02 nlsun