kafka-go
kafka-go copied to clipboard
WriteErrors Behavior Question
👋 I have a question regarding some behavior I'm seeing with this library.
Let's say I have a local kafka broker configured with message.max.bytes=512
and I attempt the following:
conn := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
BatchTimeout: 75 * time.Millisecond,
RequiredAcks: kafka.RequireAll,
}
messages := []kafka.Message{
{
Topic: "my-topic",
Value: []byte("couple chars"),
},
{
Topic: "my-topic",
Value: []byte("text containing more than 512 characters...."), // pretend this has more than 512 chars
},
}
switch err := conn.WriteMessages(context.Background(), messages...).(type) {
case nil:
return nil
case kafka.WriteErrors:
for i := range messages {
if err[i] != nil {
log.Fatalf("Failed to write message - %v", err[i])
}
}
default:
log.Fatalf("Failed to write messages to Kafka broker - %v", err)
}
In the code sample above I get two WriteErrors
, one for the first message and one for the second message.
WriteError values contain a list of errors where each entry matches the position of a message in the WriteMessages call. The program can determine the status of each message by looping over the error...
According to the documentation, I would expect to receive a partial failure. Shouldn't the first message succeed and the second message should fail?
Hi @jonwhitty It sounds like from the docs https://kafka.apache.org/documentation/#brokerconfigs_message.max.bytes that message.max.bytes
operates against the batch of messages not the individual messages within a batch, so if the batch size is greater than message.max.bytes
I would expect all messages to fail.
Ok, so in order to get the behavior I want I'd have to reduce my batchSize (in the client config) to 1.
conn := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
BatchTimeout: 75 * time.Millisecond,
BatchSize: 1,
RequiredAcks: kafka.RequireAll,
}
Is that correct? Only then could I guarantee an individual message will either succeed or fail.
Setting the writer Field BatchBytes
to the value of message.max.bytes
might be what you're looking for. With that set you should still be able to benefit from batching for smaller messages but if you try and send a message larger than message.max.bytes
you'll get an error identifying the message which is too large.
@rhansen2 that's not how it behaves. If I do this:
conn := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
BatchTimeout: 75 * time.Millisecond,
BatchBytes: 512,
RequiredAcks: kafka.RequireAll,
}
And then I publish two messages, one 512 in size (not including headers or other message metadata) and the other a couple of characters I just get an overall failed message Message Size Too Large
. So it doesn't appear I can get the behavior I want without setting the BatchSize
to 1, which is really unfortunate..
According to the Godoc, BatchBytes
Limits the maximum size of a request in bytes before being sent to a partition.
So this is a client side enforcement, which will return a single "Message Too Large" error for the whole batch if it exceeds 512.
I may not be understanding what you're looking for could you elaborate a bit on the behavior you're wanting?
@rhansen2 If I set BatchBytes to 512, and attempt to publish two messages - one of size 520 and one of size 2 (using the code sample above), I'd expect to get a WriteErrors
with one error result for the message of size 520 but the other of size 2 should succeed.
The behavior I get right now though is a get a single top-level error (no WriteErrors
) for the message payload being too large. The behavior seems to not work the way it's described.
I think in that case you're correct, if you'd only like to receive WriteErrors
you'd need to set a batch size of 1.
Following up from our offline discussion last week, I'm going to send a PR suggesting a change that could address this issue, it will give us material to further continue the conversation and make sure we are solving the problem properly.