kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

WriteErrors Behavior Question

Open jonwhitty opened this issue 3 years ago • 8 comments

👋 I have a question regarding some behavior I'm seeing with this library.

Let's say I have a local kafka broker configured with message.max.bytes=512 and I attempt the following:

conn := &kafka.Writer{
    Addr:         kafka.TCP("localhost:9092"),
    BatchTimeout: 75 * time.Millisecond,
    RequiredAcks: kafka.RequireAll,
}

messages := []kafka.Message{
    {
        Topic: "my-topic",
        Value: []byte("couple chars"),
    },
    {
	Topic: "my-topic",
	Value: []byte("text containing more than 512 characters...."), // pretend this has more than 512 chars
    },
}

switch err := conn.WriteMessages(context.Background(), messages...).(type) {
case nil:
	return nil
case kafka.WriteErrors:
	for i := range messages {
		if err[i] != nil {
                         log.Fatalf("Failed to write message - %v", err[i])
		}
	}
default:
    log.Fatalf("Failed to write messages to Kafka broker - %v", err)
}

In the code sample above I get two WriteErrors, one for the first message and one for the second message.

WriteError values contain a list of errors where each entry matches the position of a message in the WriteMessages call. The program can determine the status of each message by looping over the error...

According to the documentation, I would expect to receive a partial failure. Shouldn't the first message succeed and the second message should fail?

jonwhitty avatar Nov 02 '21 19:11 jonwhitty

Hi @jonwhitty It sounds like from the docs https://kafka.apache.org/documentation/#brokerconfigs_message.max.bytes that message.max.bytes operates against the batch of messages not the individual messages within a batch, so if the batch size is greater than message.max.bytes I would expect all messages to fail.

rhansen2 avatar Nov 02 '21 20:11 rhansen2

Ok, so in order to get the behavior I want I'd have to reduce my batchSize (in the client config) to 1.

conn := &kafka.Writer{
    Addr:         kafka.TCP("localhost:9092"),
    BatchTimeout: 75 * time.Millisecond,
    BatchSize: 1,
    RequiredAcks: kafka.RequireAll,
}

Is that correct? Only then could I guarantee an individual message will either succeed or fail.

jonwhitty avatar Nov 02 '21 21:11 jonwhitty

Setting the writer Field BatchBytes to the value of message.max.bytes might be what you're looking for. With that set you should still be able to benefit from batching for smaller messages but if you try and send a message larger than message.max.bytes you'll get an error identifying the message which is too large.

rhansen2 avatar Nov 02 '21 21:11 rhansen2

@rhansen2 that's not how it behaves. If I do this:

conn := &kafka.Writer{
    Addr:         kafka.TCP("localhost:9092"),
    BatchTimeout: 75 * time.Millisecond,
    BatchBytes: 512,
    RequiredAcks: kafka.RequireAll,
}

And then I publish two messages, one 512 in size (not including headers or other message metadata) and the other a couple of characters I just get an overall failed message Message Size Too Large. So it doesn't appear I can get the behavior I want without setting the BatchSize to 1, which is really unfortunate..

According to the Godoc, BatchBytes

Limits the maximum size of a request in bytes before being sent to a partition.

So this is a client side enforcement, which will return a single "Message Too Large" error for the whole batch if it exceeds 512.

jonwhitty avatar Nov 02 '21 21:11 jonwhitty

I may not be understanding what you're looking for could you elaborate a bit on the behavior you're wanting?

rhansen2 avatar Nov 02 '21 22:11 rhansen2

@rhansen2 If I set BatchBytes to 512, and attempt to publish two messages - one of size 520 and one of size 2 (using the code sample above), I'd expect to get a WriteErrors with one error result for the message of size 520 but the other of size 2 should succeed.

The behavior I get right now though is a get a single top-level error (no WriteErrors) for the message payload being too large. The behavior seems to not work the way it's described.

jonwhitty avatar Nov 03 '21 15:11 jonwhitty

I think in that case you're correct, if you'd only like to receive WriteErrors you'd need to set a batch size of 1.

rhansen2 avatar Nov 03 '21 16:11 rhansen2

Following up from our offline discussion last week, I'm going to send a PR suggesting a change that could address this issue, it will give us material to further continue the conversation and make sure we are solving the problem properly.

achille-roussel avatar Nov 12 '21 17:11 achille-roussel