confluent-kafka-go
confluent-kafka-go copied to clipboard
Facing blocking api call and performance degradation for readMessage
Description
We are trying to use seek/readMessage combination in a loop. We see that most of the time the readMessage causes a blocking behaviour on internal c lib's C._rk_queue_poll or some internal API. Each message takes around 500 milliseconds to be returned from readMessage. When I introduce a delay of 100 milliseconds between seek and readMessage, it reduces to 400 milliseconds for each message When I introduce a delay of 1000 milliseconds between seek and readMessage, it reduces to few mcroseoncds for each message.
Is this behavious expected? I know probably using seek/ReadMessage combo is not the natural of use cases.
How to reproduce
use below code as consumer for local kafka cluster with 1 broker, 3 partitions.
`for testRun := 0; testRun < testRuns; testRun++ { startIndex := rand.Intn(len(partitionRecords) - batchSize) endIndex := startIndex + batchSize batch := partitionRecords[startIndex:endIndex]
if SortEnable {
utils.SortByPartitionAndOffset(batch)
}
r := trace.StartRegion(context.Background(), fmt.Sprintf("Batch%d", testRun+1))
readStart := time.Now()
fmt.Println(len(batch))
for i, record := range batch {
// Parse the partition and offset values from the record
partition := record[0]
offset := record[1]
// Convert the partition and offset values to integers
partitionInt, err := strconv.Atoi(partition)
if err != nil {
panic(err)
}
offsetInt, err := strconv.Atoi(offset)
if err != nil {
panic(err)
}
// Create a new Kafka topic partition instance
topicPartition := kafka.TopicPartition{
Topic: &topic,
Partition: int32(partitionInt),
Offset: kafka.Offset(offsetInt),
}
// Use the Seek() method to fetch messages from the specified offset
seekStart := time.Now()
seekErr := p.KafkaWrapper.DoSeek(p.Consumer, topicPartition, 0)
seekEnd := time.Now()
seekTime := seekEnd.Sub(seekStart)
fmt.Println("Record No:", i, "Seek Error: ", seekErr, " partition: ", topicPartition.Partition, " offset: ", topicPartition.Offset)
// Read the message from each of those partition-offset pair
time.Sleep(2000 * time.Millisecond)
readMessageStart := time.Now()
timeoutTime, _ := time.ParseDuration("10s")
msg, err := p.KafkaWrapper.DoReadMessage(p.Consumer, timeoutTime)
readMessageEnd := time.Now()
if err != nil {
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
continue
}
// fmt.Println("Message ", string(msg.Value))
readMessageTime := readMessageEnd.Sub(readMessageStart)
//fmt.Println("ReadMessageTime: ", readMessageTime.Microseconds())
_, err2 := logFile.WriteString("" + "," + strconv.Itoa(testRun+1) + "," + "" + "," + strconv.Itoa(int(topicPartition.Partition)) + "," + strconv.Itoa(int(topicPartition.Offset)) + "," + strconv.Itoa(int(seekTime.Microseconds())) + "," + strconv.Itoa(int(readMessageTime.Microseconds())) + "," + "" + "," + "" + "\n")
if err2 != nil {
fmt.Println("Write Error in log file", err)
}
}
readEnd := time.Now()
r.End()
batchDuration := readEnd.Sub(readStart)
currentTime := time.Now()
_, err := logFile.WriteString(currentTime.Format("2006-01-02 15:04:05") + "," + "" + "," + strconv.Itoa(batchSize) + "," + "" + "," + "" + "," + "" + "," + "" + "," + strconv.Itoa(int(batchDuration.Microseconds())) + "\n")
if err != nil {
fmt.Println("Write Error in log file", err)
}
}`
We have requiement of reading 100-500 such messages from particular offset and partition that is stored offline.
Checklist
Please provide the following information:
- [x] confluent-kafka-go and librdkafka version (
LibraryVersion()): confluent kafka go - 1.9.2 - [x] Apache Kafka broker version: 2.13.x
- [x] Client configuration:
kafkaConfigMap := kafka.ConfigMap{ "bootstrap.servers": p.Properties.Bootstrapper, "group.id": p.Properties.GroupId, "partition.assignment.strategy": "cooperative-sticky", "auto.offset.reset": "earliest", "enable.auto.commit": true, } - [x] Operating system: WSL2 ubuntu on windows 10
- [] Provide client logs (with
"debug": ".."as necessary) - [] Provide broker log excerpts
- [x] Critical issue
Update: I was using default values for properties fetch.max.wait.ms (500) fetch.min.bytes (1) fetch.max.bytes( default value ) partition.fetch.max.bytes ( default value)
When I faced above issue of waiting because some function blocked.
As I understand from above configuration, poll ( through readMessage ) should have returned when either of thresholds for fetch.max.wait.ms (500) or fetch.min.bytes was reached. Ideally as it was 1 byte by default it should return immediately? Also because we are sure the topic and partition has the data as we are seeking older messages which have long enough retention period using seek before poll. So why did it wait 500ms ( I assume as almost every message read took around 500 + few Ms). If it did not wait as per the logic was there blocking due to too many requests and every request brought only 1 byte? I am not sure.
When I changed property values from their default to below values it has performed much faster fetch.max.wait.ms (0) fetch.min.bytes (max value ) fetch.max.bytes(max ) partition.fetch.max.bytes (max )
I had to set last property above to max to make it perform with multiple consumers(= no of partitions n each reading from single partition). Running in goroutines.
Can you put some light on the behaviour? Any pitfalls in above values for my use case and code above?