sarama
sarama copied to clipboard
message size incorrectly calculated (smaller than actual size)
Versions
Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
Sarama | Kafka | Go |
---|---|---|
635bcf3 | 2.12-2.0.0 | 1.17 |
Configuration
What configuration values are you using for Sarama and Kafka?
c := sarama.NewConfig()
c.Producer.Return.Successes = true
c.Producer.Return.Errors = true
// Wait only the local commit to succeed before responding.
c.Producer.RequiredAcks = sarama.WaitForLocal
// Because sarama does not accept a Context for every message, set the Timeout here.
c.Metadata.Full = true
c.Metadata.Retry.Max = 3
c.Metadata.Retry.Backoff = time.Millisecond * 250
c.Producer.MaxMessageBytes = 1000000
kafka: default config (local instance started with bin/kafka-server-start.sh config/server.properties
)
Problem Description
It seems sarama under-calculate the message size at least in the version given above. For a simple message with value size of 999964, sarama calculate the total message size as 1000000 (the default maximum) and will try to send it, but from a packet capture the actual size is 1000082 and thus will be rejected.
test program:
package main
import (
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/Shopify/sarama"
)
func main() {
c := sarama.NewConfig()
c.Producer.Return.Successes = true
c.Producer.Return.Errors = true
// Wait only the local commit to succeed before responding.
c.Producer.RequiredAcks = sarama.WaitForLocal
// Because sarama does not accept a Context for every message, set the Timeout here.
c.Metadata.Full = true
c.Metadata.Retry.Max = 3
c.Metadata.Retry.Backoff = time.Millisecond * 250
c.Producer.MaxMessageBytes = 1000000
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, c)
if err != nil {
fmt.Println("error in NewSyncProducer: ", err)
os.Exit(1)
}
sz, err := strconv.Atoi(os.Args[1])
if err != nil {
fmt.Println("invalid size:", err)
os.Exit(1)
}
bytes := []byte(strings.Repeat("x", sz))
message := sarama.ProducerMessage {
Topic: "test",
Value: sarama.ByteEncoder(bytes),
}
err = producer.SendMessages([]*sarama.ProducerMessage{&message})
if err != nil {
value := err.(sarama.ProducerErrors)
fmt.Println("error in SendMessages", value[0].Err.Error())
}
}
This is because in async_producer.go:L233
, they add a size over head. When you are using Kafka version higher than 0.11, the over head is 36. So you can send a message with size 999964 successfully, but size with 999965 will got an error.
But even with value of size 999964 it fails, because the actually size is 1000082. (please see my second pics above)
We should ensure we are synced up with the similar record size estimation code from the Java client org/apache/kafka/common/record/AbstractRecords.java#L89-L157
From a quick look of both the java and sarama implementation, for version >= 2 it looks like in the java side it will use DefaultRecordBatch.RECORD_BATCH_OVERHEAD
as the RecordBatch
overhead, which is 61 bytes and is in sync with this document: https://kafka.apache.org/documentation/#recordbatch (all the fields size before the records field + the 4 byte length field of the array itself). Sarama is using maximumRecordOverhead
which is 36 bytes though.
Besides the java side is using sizeOfVarint
and friends to get the actually size of the varint, whereas sarama is using max size of the corresponding int type.
If we are just providing an estimated size check I think it would be acceptable for current implementation, otherwise maybe we can make it really the same as the java code which seems is the accurate size.
BTW I looked at the wrong place in my first comment, I looked at the message size whereas I should look at the record batch size. But the number is still off by 24 bytes for a string value of size around 999964. (~~Looking at the encode function of produce_request, it doesn't seem to be using the RecordBatch
format linked above, so maybe the overhead of 36 bytes is correct. But in the end there's some part mis-calculated that causing the size smaller than the actual size~~ had another look, batch record length appears to be the length from recordbatch's partitionLeaderEpoch to the end, so that's 61 - 8 - 4 = 49 bytes overhead for the recordbatch header)
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.