confluent-kafka-python
confluent-kafka-python copied to clipboard
Use function consume lost message after call seek
Description
I use consume function to get messages in batches,and the message is lost after calling the seek function.
How to reproduce
The following code reproduces the problem
from confluent_kafka import Consumer, TopicPartition
c = Consumer({
'bootstrap.servers': 'my_kafka_host',
'group.id': 'my_group',
'auto.offset.reset': 'latest',
'enable.auto.commit': False,
'fetch.min.bytes': 1024 * 1024,
'fetch.wait.max.ms': 250,
'debug': 'consumer,fetch',
})
c.subscribe(['my_topic'])
seeks, offsets = {}, {}
try:
while True:
messages = c.consume(1000, timeout=0.25)
for message in messages:
offset = message.offset()
partition = message.partition()
if partition == 0:
if partition not in seeks:
seeks[partition] = offset
offset = seeks[partition]
c.seek(TopicPartition(topic=message.topic(), partition=partition, offset=offset))
print(f"Seek [{partition}], offset {offset}")
else:
if partition in offsets and offset - offsets[partition] != 1:
print(f"Consume [{partition}] lost message!!!, obtain {offset}, expect {offsets[partition] + 1}")
raise RuntimeError()
else:
offsets[partition] = offset
print(f"Consume [{partition}], offset {offset}")
finally:
c.unsubscribe()
c.close()
Program error log
%7|1642652874.613|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v1.8.2 (0x10802ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, STRIP STATIC_LINKING GCC GXX PKGCONFIG INSTALL GNULD LDS LIBDL PLUGINS STATIC_LIB_zlib ZLIB STATIC_LIB_libcrypto STATIC_LIB_libssl SSL STATIC_LIB_libzstd ZSTD HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x2400)
%7|1642652874.613|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "my_group": new assignment of 2 partition(s) in join-state init
%7|1642652874.613|CLEARASSIGN|rdkafka#consumer-1| [thrd:main]: No current assignment to clear
%7|1642652874.613|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Added 2 partition(s) to assignment which now consists of 2 partition(s) where of 2 are in pending state and 0 are being queried
%7|1642652874.621|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/2: Fetch committed offsets for 2/2 partition(s)
%7|1642652874.622|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my_topic [1] start fetching at offset 10900532
%7|1642652874.622|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my_topic [0] start fetching at offset 10675160
%7|1642652874.622|FETCH|rdkafka#consumer-1| [thrd:b-2.my_kafka_host.]: b-2.my_kafka_host.com:9092/2: Topic my_topic [1] in state active at offset 10900532 (0/100000 msgs, 0/65536 kb queued, opv 3) is fetchable
%7|1642652874.622|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Topic my_topic [0] in state active at offset 10675160 (0/100000 msgs, 0/65536 kb queued, opv 3) is fetchable
%7|1642652874.622|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch topic my_topic [0] at offset 10675160 (v3)
%7|1642652874.622|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch 1/1/80 toppar(s)
%7|1642652874.625|FETCH|rdkafka#consumer-1| [thrd:b-2.my_kafka_host.]: b-2.my_kafka_host.com:9092/2: Fetch topic my_topic [1] at offset 10900532 (v3)
%7|1642652874.625|FETCH|rdkafka#consumer-1| [thrd:b-2.my_kafka_host.]: b-2.my_kafka_host.com:9092/2: Fetch 1/1/80 toppar(s)
%7|1642652874.699|CONSUME|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Enqueue 1819 message(s) (2053420 bytes, 1819 ops) on my_topic [0] fetch queue (qlen 0, v3, last_offset 10676978, 0 ctrl msgs, gzip)
%7|1642652874.699|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch topic my_topic [0] at offset 10676979 (v3)
%7|1642652874.699|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch 1/1/80 toppar(s)
%7|1642652874.700|CONSUME|rdkafka#consumer-1| [thrd:b-2.my_kafka_host.]: b-2.my_kafka_host.com:9092/2: Enqueue 1786 message(s) (2128368 bytes, 1786 ops) on my_topic [1] fetch queue (qlen 819, v3, last_offset 10902317, 0 ctrl msgs, gzip)
%7|1642652874.700|FETCH|rdkafka#consumer-1| [thrd:b-2.my_kafka_host.]: b-2.my_kafka_host.com:9092/2: Fetch topic my_topic [1] at offset 10902318 (v3)
%7|1642652874.700|FETCH|rdkafka#consumer-1| [thrd:b-2.my_kafka_host.]: b-2.my_kafka_host.com:9092/2: Fetch 1/1/80 toppar(s)
Seek [0], offset 10675160
...
Seek [0], offset 10675160
%7|1642652874.707|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch topic my_topic [0] at offset 10675160 (v91)
%7|1642652874.707|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch 1/1/80 toppar(s)
Seek [0], offset 10675160
...
Seek [0], offset 10675160
%7|1642652874.713|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch topic my_topic [0] at offset 10675160 (v217)
%7|1642652874.713|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch 1/1/80 toppar(s)
Seek [0], offset 10675160
...
Seek [0], offset 10675160
%7|1642652874.719|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch topic my_topic [0] at offset 10675160 (v353)
%7|1642652874.719|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch 1/1/80 toppar(s)
Seek [0], offset 10675160
...
Seek [0], offset 10675160
%7|1642652874.725|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch topic my_topic [0] at offset 10675160 (v486)
%7|1642652874.725|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch 1/1/80 toppar(s)
Seek [0], offset 10675160
...
Seek [0], offset 10675160
%7|1642652874.732|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch topic my_topic [0] at offset 10675160 (v637)
%7|1642652874.732|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch 1/1/80 toppar(s)
Seek [0], offset 10675160
...
Seek [0], offset 10675160
%7|1642652874.738|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch topic my_topic [0] at offset 10675160 (v755)
%7|1642652874.738|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch 1/1/80 toppar(s)
Seek [0], offset 10675160
...
Seek [0], offset 10675160
%7|1642652874.744|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch topic my_topic [0] at offset 10675160 (v875)
%7|1642652874.744|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch 1/1/80 toppar(s)
Seek [0], offset 10675160
...
Seek [0], offset 10675160
%7|1642652874.750|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch topic my_topic [0] at offset 10675160 (v1003)
%7|1642652874.750|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Fetch 1/1/80 toppar(s)
Consume [1], offset 10900532
Consume [1], offset 10900533
Consume [1], offset 10900534
...
Consume [1], offset 10901529
Consume [1], offset 10901530
Consume [1], offset 10901531
%7|1642652874.782|CONSUME|rdkafka#consumer-1| [thrd:b-2.my_kafka_host.]: b-2.my_kafka_host.com:9092/2: Enqueue 1793 message(s) (2048686 bytes, 1793 ops) on my_topic [1] fetch queue (qlen 0, v3, last_offset 10904110, 0 ctrl msgs, gzip)
%7|1642652874.782|FETCH|rdkafka#consumer-1| [thrd:b-2.my_kafka_host.]: b-2.my_kafka_host.com:9092/2: Fetch topic my_topic [1] at offset 10904111 (v3)
%7|1642652874.782|FETCH|rdkafka#consumer-1| [thrd:b-2.my_kafka_host.]: b-2.my_kafka_host.com:9092/2: Fetch 1/1/80 toppar(s)
Consume [1] occurred error!!!, obtain 10902318, expect 10901532
%7|1642652874.783|SUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group "my_group": subscribe to new unset subscription of 0 topics (join-state init)
%7|1642652874.783|CLOSE|rdkafka#consumer-1| [thrd:app]: Closing consumer
%7|1642652874.783|CLOSE|rdkafka#consumer-1| [thrd:app]: Waiting for close events
%7|1642652874.783|CLEARASSIGN|rdkafka#consumer-1| [thrd:main]: Clearing current assignment of 2 partition(s)
%7|1642652874.783|REMOVE|rdkafka#consumer-1| [thrd:main]: Served 2 removed partition(s), with 2 offset(s) to commit
%7|1642652874.784|CLOSE|rdkafka#consumer-1| [thrd:app]: Consumer closed
%7|1642652874.784|DESTROY|rdkafka#consumer-1| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1642652874.784|CLOSE|rdkafka#consumer-1| [thrd:app]: Closing consumer
%7|1642652874.784|CLOSE|rdkafka#consumer-1| [thrd:app]: Waiting for close events
%7|1642652874.784|CLOSE|rdkafka#consumer-1| [thrd:app]: Consumer closed
%7|1642652874.784|DESTROY|rdkafka#consumer-1| [thrd:main]: Destroy internal
%7|1642652874.784|DESTROY|rdkafka#consumer-1| [thrd:main]: Removing all topics
%7|1642652874.784|FETCH|rdkafka#consumer-1| [thrd:b-2.my_kafka_host.]: b-2.my_kafka_host.com:9092/2: Topic my_topic [1] in state stopped at offset 10900532 (0/100000 msgs, 0/65536 kb queued, opv 3) is not fetchable: not in active fetch state
%7|1642652874.823|CONSUME|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Enqueue 1819 message(s) (2053420 bytes, 1819 ops) on my_topic [0] fetch queue (qlen 0, v1003, last_offset 10676978, 0 ctrl msgs, gzip)
%7|1642652874.823|FETCH|rdkafka#consumer-1| [thrd:b-1.my_kafka_host.]: b-1.my_kafka_host.com:9092/1: Topic my_topic [0] in state stopped at offset 10675160 (0/100000 msgs, 0/65536 kb queued, opv 1003) is not fetchable: partition removed
Traceback (most recent call last):
File "bug.py", line 36, in <module>
raise RuntimeError("occurred error!!!")
RuntimeError: occurred error!!!
Checklist
Please provide the following information:
- [x] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()): ('1.8.2', 17302016) and ('1.8.2', 17302271) - [x] Apache Kafka broker version: 2.4.1.1
- [x] Client configuration:
{'bootstrap.servers': 'my_kafka_host', 'group.id': 'my_group', 'auto.offset.reset': 'latest', 'enable.auto.commit': False, 'fetch.min.bytes': 1048576, 'fetch.wait.max.ms': 250, 'debug': 'consumer,fetch'} - [x] Operating system: Debian GNU/Linux 10
- [x] Provide client logs (with
'debug': '..'as necessary) - [ ] Provide broker log excerpts
- [x] Critical issue
Hi @luoyucumt, thanks for reporting this. According to the docs of Python client : https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html,
seek() may only be used to update the consume offset of an actively consumed partition (i.e., after assign()), to set the starting offset of partition not being consumed instead pass the offset in an assign() call. Here is an example of how to use it: https://github.com/confluentinc/confluent-kafka-python/blob/master/tests/test_Consumer.py#L56
Hi @jliunyu, thanks for your reply. Use seek() in the code to meet an actively consumed partition condition. According to the code, after calling consume() multiple times, the offset of partition [0] is still 10675160, seek() works fine, but after consuming 1000 messages of partition [1] offset from 10900532 to 10901531, call consume() again, the offset of the partition [1] message is 10902318 instead of 10901532