librdkafka
librdkafka copied to clipboard
`rd_kafka_offsets_store` does not store and send offset commit metadata
Description
Offset commit metadata can be used to send additional data to the broker when making an offset commit. See this doc from the Java consumer.
Librdkafka currently sends the offset commit metadata when the commit is requested by calling rd_kafka_commit_queue but not when calling rd_kafka_offsets_store to use the auto-commit.
How to reproduce
- Using
confluent-kafka-go, which supports offset commit metadata, store an offset with metadata to be auto-committed:
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "test",
"enable.auto.offset.store": false,
"enable.auto.commit": true,
})
if err != nil {
panic(err)
}
_, version := kafka.LibraryVersion()
fmt.Printf("Started consumer version:%s\n", version)
err = consumer.Subscribe("test", func(consumer *kafka.Consumer, event kafka.Event) error {
fmt.Printf("%v\n", event.String())
return nil
})
if err != nil {
panic(err)
}
for {
event := consumer.Poll(1000)
if event == nil {
fmt.Println("No event")
continue
}
switch e := event.(type) {
case kafka.Error:
fmt.Printf("%s\n", e.Error())
case *kafka.Message:
fmt.Printf("%s: %s\n", e.TopicPartition, e.Value)
metadata := "test metadata"
_, err = consumer.StoreOffsets([]kafka.TopicPartition{{
Topic: e.TopicPartition.Topic,
Partition: e.TopicPartition.Partition,
Offset: e.TopicPartition.Offset + 1,
Metadata: &metadata,
}})
if err != nil {
panic(err)
}
}
break
}
err = consumer.Close()
if err != nil {
panic(err)
}
}
Started consumer version:1.9.1
AssignedPartitions: [test[0]@unset]
No event
test[0]@1: hello
RevokedPartitions: [test[0]@unset]
- Using
kafka-python, print the offsets for that consumer group:
from kafka import KafkaAdminClient
config = {"bootstrap_servers": "localhost:9092"}
client = KafkaAdminClient(**config)
print(client.list_consumer_group_offsets("test"))
{TopicPartition(topic='test', partition=0): OffsetAndMetadata(offset=1, metadata='')}
- Now replace
StoreOffsetswithCommitOffsetsin the above go script to make the commit synchronously and run the same python script:
Started consumer version:1.9.1
AssignedPartitions: [test[0]@unset]
No event
test[0]@2: hello
RevokedPartitions: [test[0]@unset]
{TopicPartition(topic='test', partition=0): OffsetAndMetadata(offset=2, metadata='test metadata')}
Investigation
rd_kafka_offset_store should pass the metadata to rd_kafka_offset_store0 so that it can store the last metadata sent in the topic partition toppar.
It can then be recovered when re-creating the offsets list from stored data here.
https://github.com/edenhill/librdkafka/compare/master...mathispesch:librdkafka:store-offsets-metadata is a proposed patch, that I can continue working on to try and factorise it and add tests.
Checklist
- [x] librdkafka version (release number or git tag):
1.9.1 - [x] Apache Kafka version:
2.7.2 - [x] librdkafka client configuration:
enable.auto.commit=true, enable.auto.offset.store=false - [x] Operating system:
Ubuntu 20.04 - [x] Provide logs (with
debug=..as necessary) from librdkafka: https://gist.github.com/mathispesch/1df4d26873fa483b202497a198936c76 - [x] Provide broker log excerpts: no relevant logs, see output of kafka admin calls instead
- [ ] Critical issue
Hey @edenhill, What do you think about this one? Happy to update and try and add tests to the proposed fix.
Good find, @mathispesch , this was an oversight at the time store offsets was implemented. I think it makes total sense to have the richer rd_kafka_offsets_store() also store the metadata :+1:
Fixed by #4084.