librdkafka icon indicating copy to clipboard operation
librdkafka copied to clipboard

`rd_kafka_offsets_store` does not store and send offset commit metadata

Open mathispesch opened this issue 3 years ago • 2 comments

Description

Offset commit metadata can be used to send additional data to the broker when making an offset commit. See this doc from the Java consumer.

Librdkafka currently sends the offset commit metadata when the commit is requested by calling rd_kafka_commit_queue but not when calling rd_kafka_offsets_store to use the auto-commit.

How to reproduce

  1. Using confluent-kafka-go, which supports offset commit metadata, store an offset with metadata to be auto-committed:
package main

import (
	"fmt"

	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
	consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers":        "localhost:9092",
		"group.id":                 "test",
		"enable.auto.offset.store": false,
		"enable.auto.commit":       true,
	})
	if err != nil {
		panic(err)
	}
	_, version := kafka.LibraryVersion()
	fmt.Printf("Started consumer version:%s\n", version)

	err = consumer.Subscribe("test", func(consumer *kafka.Consumer, event kafka.Event) error {
		fmt.Printf("%v\n", event.String())
		return nil
	})
	if err != nil {
		panic(err)
	}

	for {
		event := consumer.Poll(1000)
		if event == nil {
			fmt.Println("No event")
			continue
		}
		switch e := event.(type) {
		case kafka.Error:
			fmt.Printf("%s\n", e.Error())
		case *kafka.Message:
			fmt.Printf("%s: %s\n", e.TopicPartition, e.Value)
			metadata := "test metadata"
			_, err = consumer.StoreOffsets([]kafka.TopicPartition{{
				Topic:     e.TopicPartition.Topic,
				Partition: e.TopicPartition.Partition,
				Offset:    e.TopicPartition.Offset + 1,
				Metadata:  &metadata,
			}})
			if err != nil {
				panic(err)
			}
		}
		break
	}
	err = consumer.Close()
	if err != nil {
		panic(err)
	}
}
Started consumer version:1.9.1
AssignedPartitions: [test[0]@unset]
No event
test[0]@1: hello
RevokedPartitions: [test[0]@unset]
  1. Using kafka-python, print the offsets for that consumer group:
from kafka import KafkaAdminClient

config = {"bootstrap_servers": "localhost:9092"}
client = KafkaAdminClient(**config)
print(client.list_consumer_group_offsets("test"))
{TopicPartition(topic='test', partition=0): OffsetAndMetadata(offset=1, metadata='')}
  1. Now replace StoreOffsets with CommitOffsets in the above go script to make the commit synchronously and run the same python script:
Started consumer version:1.9.1
AssignedPartitions: [test[0]@unset]
No event
test[0]@2: hello
RevokedPartitions: [test[0]@unset]
{TopicPartition(topic='test', partition=0): OffsetAndMetadata(offset=2, metadata='test metadata')}

Investigation

rd_kafka_offset_store should pass the metadata to rd_kafka_offset_store0 so that it can store the last metadata sent in the topic partition toppar.

It can then be recovered when re-creating the offsets list from stored data here.

https://github.com/edenhill/librdkafka/compare/master...mathispesch:librdkafka:store-offsets-metadata is a proposed patch, that I can continue working on to try and factorise it and add tests.

Checklist

  • [x] librdkafka version (release number or git tag): 1.9.1
  • [x] Apache Kafka version: 2.7.2
  • [x] librdkafka client configuration: enable.auto.commit=true, enable.auto.offset.store=false
  • [x] Operating system: Ubuntu 20.04
  • [x] Provide logs (with debug=.. as necessary) from librdkafka: https://gist.github.com/mathispesch/1df4d26873fa483b202497a198936c76
  • [x] Provide broker log excerpts: no relevant logs, see output of kafka admin calls instead
  • [ ] Critical issue

mathispesch avatar Jul 27 '22 12:07 mathispesch

Hey @edenhill, What do you think about this one? Happy to update and try and add tests to the proposed fix.

mathispesch avatar Nov 10 '22 13:11 mathispesch

Good find, @mathispesch , this was an oversight at the time store offsets was implemented. I think it makes total sense to have the richer rd_kafka_offsets_store() also store the metadata :+1:

edenhill avatar Nov 10 '22 16:11 edenhill

Fixed by #4084.

mathispesch avatar May 23 '23 08:05 mathispesch