confluent-kafka-python
confluent-kafka-python copied to clipboard
assign() does NOT work inside on_assign-Callback if assignor is configured/seek() does NOT work inside on_assign-Callback
Hello! My usecase is consuming a certain timeframe from a single topic with multiple consumers in a meaningful manner in case of fault (cooperative sticky assignor). But for now I am using just one consumer with one partition. So I use offsets_for_times to get offsets for the start timestamps and try to seek them.
@edenhill commented in #373 that one shouldn't use (poll and) seek after subscribe since subscribe is asyncronous. One should use the following instead:
def on_assign(consumer, partitions):
for p in partitions:
# some starting offset, or use OFFSET_BEGINNING, et, al.
# the default offset is STORED which means use committed offsets, and if
# no committed offsets are available use auto.offset.reset config (default latest)
p.offset = 1234
# call assign() to start fetching the given partitions.
consumer.assign(partitions)
consumer.subscribe(mytopics, on_assign=on_assign)
This DOES NOT work if you configured an assignor: conf['partition.assignment.strategy']='cooperative-sticky'
, though:
cimpl.KafkaException: KafkaError{code=_STATE,val=-172,str="Failed to set assignment: Local : Erroneous state"}
That seems reasonable since an assignor assigns the partitions but the following version with seek DOES NOT work (neither with or without an assignor):
def on_assign(c, partitions):
partition_to_timestamp_mapping = {topic_partition.partition: int(START_DT.timestamp()*1000) for topic_partition in partitions}
topic_partitions_with_new_offsets = c.offsets_for_times([TopicPartition(TOPICS[0], partition, partition_to_timestamp_mapping[partition]) for partition in partition_to_timestamp_mapping.keys()])
for topic_partition in partitions:
c.seek(topic_partition)
(I read in an issue that calling assign is not necessary. The same code in the way it is not recommended works flawlessly.)
This doesn't work: cimpl.KafkaException: KafkaError{code=_UNKNOWN_PARTITION,val=-190,str="Failed to seek to offset 1524: Local: Unknown partition"}
Even a minimum example passing in the provided partitions to seek fails:
for topic_partition in [TopicPartition(TOPICS[0], partition.partition, OFFSET_BEGINNING) for partition in partitions]:
c.seek(topic_partition )
I am using up to date versions of the Python library and Kafka images:
- confluent-kafka-python version: 2.3.0
- Apache Kafka broker version: 7.5.1-1-ubi8 via Docker Compose
Any help is very appreciated! Thank you!