sarama icon indicating copy to clipboard operation
sarama copied to clipboard

message size incorrectly calculated (smaller than actual size)

Open Frefreak opened this issue 3 years ago • 4 comments

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

Sarama Kafka Go
635bcf3 2.12-2.0.0 1.17
Configuration

What configuration values are you using for Sarama and Kafka?

	c := sarama.NewConfig()
	c.Producer.Return.Successes = true
	c.Producer.Return.Errors = true
	// Wait only the local commit to succeed before responding.
	c.Producer.RequiredAcks = sarama.WaitForLocal
	// Because sarama does not accept a Context for every message, set the Timeout here.
	c.Metadata.Full = true
	c.Metadata.Retry.Max = 3
	c.Metadata.Retry.Backoff = time.Millisecond * 250
	c.Producer.MaxMessageBytes = 1000000

kafka: default config (local instance started with bin/kafka-server-start.sh config/server.properties)

Problem Description

It seems sarama under-calculate the message size at least in the version given above. For a simple message with value size of 999964, sarama calculate the total message size as 1000000 (the default maximum) and will try to send it, but from a packet capture the actual size is 1000082 and thus will be rejected. image image

test program:

package main

import (
	"fmt"
	"os"
	"strconv"
	"strings"
	"time"

	"github.com/Shopify/sarama"
)

func main() {
	c := sarama.NewConfig()
	c.Producer.Return.Successes = true
	c.Producer.Return.Errors = true
	// Wait only the local commit to succeed before responding.
	c.Producer.RequiredAcks = sarama.WaitForLocal
	// Because sarama does not accept a Context for every message, set the Timeout here.
	c.Metadata.Full = true
	c.Metadata.Retry.Max = 3
	c.Metadata.Retry.Backoff = time.Millisecond * 250
	c.Producer.MaxMessageBytes = 1000000

	producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, c)
	if err != nil {
		fmt.Println("error in NewSyncProducer: ", err)
		os.Exit(1)
	}
	sz, err := strconv.Atoi(os.Args[1])
	if err != nil {
		fmt.Println("invalid size:", err)
		os.Exit(1)
	}
	bytes := []byte(strings.Repeat("x", sz))
	message := sarama.ProducerMessage {
		Topic: "test",
		Value: sarama.ByteEncoder(bytes),
	}
	err = producer.SendMessages([]*sarama.ProducerMessage{&message})
	if err != nil {
		value := err.(sarama.ProducerErrors)
		fmt.Println("error in SendMessages", value[0].Err.Error())
	}
}

Frefreak avatar Dec 07 '21 07:12 Frefreak

This is because in async_producer.go:L233, they add a size over head. When you are using Kafka version higher than 0.11, the over head is 36. So you can send a message with size 999964 successfully, but size with 999965 will got an error.

liangxj8 avatar Dec 07 '21 09:12 liangxj8

But even with value of size 999964 it fails, because the actually size is 1000082. (please see my second pics above)

Frefreak avatar Dec 07 '21 09:12 Frefreak

We should ensure we are synced up with the similar record size estimation code from the Java client org/apache/kafka/common/record/AbstractRecords.java#L89-L157

dnwe avatar Dec 07 '21 12:12 dnwe

From a quick look of both the java and sarama implementation, for version >= 2 it looks like in the java side it will use DefaultRecordBatch.RECORD_BATCH_OVERHEAD as the RecordBatch overhead, which is 61 bytes and is in sync with this document: https://kafka.apache.org/documentation/#recordbatch (all the fields size before the records field + the 4 byte length field of the array itself). Sarama is using maximumRecordOverhead which is 36 bytes though.

Besides the java side is using sizeOfVarint and friends to get the actually size of the varint, whereas sarama is using max size of the corresponding int type.

If we are just providing an estimated size check I think it would be acceptable for current implementation, otherwise maybe we can make it really the same as the java code which seems is the accurate size.

BTW I looked at the wrong place in my first comment, I looked at the message size whereas I should look at the record batch size. But the number is still off by 24 bytes for a string value of size around 999964. (~~Looking at the encode function of produce_request, it doesn't seem to be using the RecordBatch format linked above, so maybe the overhead of 36 bytes is correct. But in the end there's some part mis-calculated that causing the size smaller than the actual size~~ had another look, batch record length appears to be the length from recordbatch's partitionLeaderEpoch to the end, so that's 61 - 8 - 4 = 49 bytes overhead for the recordbatch header)

Frefreak avatar Dec 07 '21 14:12 Frefreak

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

github-actions[bot] avatar Aug 19 '23 08:08 github-actions[bot]

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

github-actions[bot] avatar Nov 17 '23 20:11 github-actions[bot]