confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Sometimes data cannot be consumed (disappears after restarting)

Open helloHKTK opened this issue 1 year ago • 5 comments

Description

'py_client_tt2' cannot consume data from 'mytopic' (it can be confirmed that there is data in 'mytopic'). Below are the logs, and the confluent-kafka-python used is 2.2.0, which uses the most basic consumption function.

FETCH [rdkafka#consumer-1] [thrd:10.31.51.112:9092/bootstrap]: 10.31.51.112:9092/0: Fetch topic mytopic [0] at offset 255 (v4)
FETCH [rdkafka#consumer-1] [thrd:10.31.51.112:9092/bootstrap]: 10.31.51.112:9092/0: Fetch 1/1/1 toppar(s)
SEND [rdkafka#consumer-1] [thrd:10.31.51.112:9092/bootstrap]: 10.31.51.112:9092/0: Sent FetchRequest (v11, 104 bytes @ 0, CorrId 24)
RECV [rdkafka#consumer-1] [thrd:10.31.51.112:9092/bootstrap]: 10.31.51.112:9092/0: Received ListOffsetsResponse (v2, 50 bytes, CorrId 22, rtt 500.24ms)
RECV [rdkafka#consumer-1] [thrd:10.31.51.112:9092/bootstrap]: 10.31.51.112:9092/0: Received ListOffsetsResponse (v2, 50 bytes, CorrId 23, rtt 500.38ms)
HEARTBEAT [rdkafka#consumer-1] [thrd:main]: GroupCoordinator/0: Heartbeat for group "py_client_tt2" generation id 297
SEND [rdkafka#consumer-1] [thrd:GroupCoordinator]: GroupCoordinator/0: Sent HeartbeatRequest (v3, 106 bytes @ 0, CorrId 14)
RECV [rdkafka#consumer-1] [thrd:GroupCoordinator]: GroupCoordinator/0: Received HeartbeatResponse (v3, 6 bytes, CorrId 14, rtt 0.34ms)
HEARTBEAT [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2" heartbeat error response in state up (join-state steady, 1 partition(s) assigned): Broker: Group rebalance in progress
REBALANCE [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2" is rebalancing (EAGER) in state up (join-state steady) with 1 assigned partition(s): rebalance in progress
CGRPJOINSTATE [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2" changed join state steady -> wait-unassign-call (state up)
ASSIGN [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2": delegating revoke of 1 partition(s) to application on queue rd_kafka_cgrp_new: rebalance in progress
PAUSE [rdkafka#consumer-1] [thrd:main]: Pausing fetchers for 1 assigned partition(s): rebalance
PAUSE [rdkafka#consumer-1] [thrd:main]: Library pausing 1 partition(s)
BARRIER [rdkafka#consumer-1] [thrd:main]: mytopic [0]: rd_kafka_toppar_op_pause_resume:2262: new version barrier v5
PAUSE [rdkafka#consumer-1] [thrd:main]: Pause mytopic [0] (v5)
ASSIGNMENT [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2": clearing group assignment
OP [rdkafka#consumer-1] [thrd:main]: mytopic [0] received op PAUSE (v5) in fetch-state active (opv4)
PAUSE [rdkafka#consumer-1] [thrd:main]: Pause mytopic [0]: at offset 255 (state active, v5)
CGRPOP [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2" received op GET_REBALANCE_PROTOCOL in state up (join-state wait-unassign-call)
CGRPOP [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2" received op ASSIGN in state up (join-state wait-unassign-call)
CLEARASSIGN [rdkafka#consumer-1] [thrd:main]: Clearing current assignment of 1 partition(s)
CGRPJOINSTATE [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2" changed join state wait-unassign-call -> wait-unassign-to-complete (state up)
DUMP [rdkafka#consumer-1] [thrd:main]: Assignment dump (started_cnt=1, wait_stop_cnt=0)
DUMP_ALL [rdkafka#consumer-1] [thrd:main]: List with 0 partition(s):
DUMP_REM [rdkafka#consumer-1] [thrd:main]:  mytopic [0] offset STORED
BARRIER [rdkafka#consumer-1] [thrd:main]: mytopic [0]: rd_kafka_toppar_op_fetch_stop:2201: new version barrier v6
CONSUMER [rdkafka#consumer-1] [thrd:main]: Stop consuming mytopic [0] (v6)
BARRIER [rdkafka#consumer-1] [thrd:main]: mytopic [0]: rd_kafka_toppar_op_pause_resume:2262: new version barrier v7
RESUME [rdkafka#consumer-1] [thrd:main]: Resume mytopic [0] (v7)
DESP [rdkafka#consumer-1] [thrd:main]: Removing (un)desired topic mytopic [0]
REMOVE [rdkafka#consumer-1] [thrd:main]: Removing mytopic [0] from assignment (started=true, pending=false, queried=false, stored offset=255)
REMOVE [rdkafka#consumer-1] [thrd:main]: Served 1 removed partition(s), with 1 offset(s) to commit
ASSIGNMENT [rdkafka#consumer-1] [thrd:main]: Current assignment of 0 partition(s) with 0 pending adds, 0 offset queries, 1 partitions awaiting stop and 0 offset commits in progress
OP [rdkafka#consumer-1] [thrd:main]: mytopic [0] received op FETCH_STOP (v6) in fetch-state active (opv5)
FETCH [rdkafka#consumer-1] [thrd:main]: Stopping fetch for mytopic [0] in state active (v6)
PARTSTATE [rdkafka#consumer-1] [thrd:main]: Partition mytopic [0] changed fetch state active -> stopping
STORETERM [rdkafka#consumer-1] [thrd:main]: mytopic [0]: offset store terminating
PARTSTATE [rdkafka#consumer-1] [thrd:main]: Partition mytopic [0] changed fetch state stopping -> stopped
OP [rdkafka#consumer-1] [thrd:main]: mytopic [0] received op PAUSE (v7) in fetch-state stopped (opv6)
RESUME [rdkafka#consumer-1] [thrd:main]: Not resuming stopped mytopic [0]: at offset 255 (state stopped, v7)
CGRPOP [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2" received op PARTITION_LEAVE in state up (join-state wait-unassign-to-complete) for mytopic [0]
PARTDEL [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2": delete mytopic [0]
STOPSERVE [rdkafka#consumer-1] [thrd:main]: All partitions awaiting stop are now stopped: serving assignment
DUMP [rdkafka#consumer-1] [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
DUMP_ALL [rdkafka#consumer-1] [thrd:main]: List with 0 partition(s):
DUMP_PND [rdkafka#consumer-1] [thrd:main]: List with 0 partition(s):
DUMP_QRY [rdkafka#consumer-1] [thrd:main]: List with 0 partition(s):
DUMP_REM [rdkafka#consumer-1] [thrd:main]: List with 0 partition(s):
ASSIGNDONE [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2": assignment operations done in join-state wait-unassign-to-complete (rebalance rejoin=false)
UNASSIGN [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2": unassign done in state up (join-state wait-unassign-to-complete)
REJOIN [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2": Rejoining group without an assignment: Unassignment done
CGRPJOINSTATE [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2" changed join state wait-unassign-to-complete -> init (state up)
JOIN [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2": join with 1 subscribed topic(s)
CGRPMETADATA [rdkafka#consumer-1] [thrd:main]: consumer join: metadata for subscription is up to date (591ms old)
JOIN [rdkafka#consumer-1] [thrd:main]: 10.31.51.112:9092/0: Joining group "py_client_tt2" with 1 subscribed topic(s) and member id "rdkafka-371db227-3de7-4034-b464-3ada2ca625b5"
CGRPJOINSTATE [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2" changed join state init -> wait-join (state up)
SEND [rdkafka#consumer-1] [thrd:GroupCoordinator]: GroupCoordinator/0: Sent JoinGroupRequest (v5, 211 bytes @ 0, CorrId 15)
BROADCAST [rdkafka#consumer-1] [thrd:GroupCoordinator]: Broadcasting state change
RECV [rdkafka#consumer-1] [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 277 bytes, CorrId 15, rtt 1.11ms)
JOINGROUP [rdkafka#consumer-1] [thrd:main]: JoinGroup response: GenerationId 298, Protocol range, LeaderId rdkafka-371db227-3de7-4034-b464-3ada2ca625b5 (me), my MemberId rdkafka-371db227-3de7-4034-b464-3ada2ca625b5, member metadata count 2: (no error)
JOINGROUP [rdkafka#consumer-1] [thrd:main]: I am elected leader for group "py_client_tt2" with 2 member(s)
GRPLEADER [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2": resetting group leader info: JoinGroup response clean-up
CGRPJOINSTATE [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2" changed join state wait-join -> wait-metadata (state up)
METADATA [rdkafka#consumer-1] [thrd:main]: GroupCoordinator/0: Request metadata for 1 topic(s): partition assignor
SEND [rdkafka#consumer-1] [thrd:GroupCoordinator]: GroupCoordinator/0: Sent MetadataRequest (v4, 42 bytes @ 0, CorrId 16)
RECV [rdkafka#consumer-1] [thrd:GroupCoordinator]: GroupCoordinator/0: Received MetadataResponse (v4, 113 bytes, CorrId 16, rtt 0.36ms)
METADATA [rdkafka#consumer-1] [thrd:main]: GroupCoordinator/0: ===== Received metadata (for 1 requested topics): partition assignor =====
METADATA [rdkafka#consumer-1] [thrd:main]: GroupCoordinator/0: ClusterId: wC4PkFXwTu6PryXXZpj0QQ, ControllerId: 0
METADATA [rdkafka#consumer-1] [thrd:main]: GroupCoordinator/0: 1 brokers, 1 topics
METADATA [rdkafka#consumer-1] [thrd:main]: GroupCoordinator/0:   Broker #0/1: 10.31.51.112:9092 NodeId 0
METADATA [rdkafka#consumer-1] [thrd:main]: GroupCoordinator/0:   Topic #0/1: mytopic with 1 partitions
METADATA [rdkafka#consumer-1] [thrd:main]:   Topic mytopic partition 0 Leader 0
METADATA [rdkafka#consumer-1] [thrd:main]: GroupCoordinator/0: 1/1 requested topic(s) seen in metadata
ASSIGN [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2" running range assignor for 2 member(s) and 1 eligible subscribed topic(s):
ASSIGN [rdkafka#consumer-1] [thrd:main]:  Member "rdkafka-371db227-3de7-4034-b464-3ada2ca625b5" (me) with 0 owned partition(s) and 1 subscribed topic(s):
ASSIGN [rdkafka#consumer-1] [thrd:main]:   mytopic [-1]
ASSIGN [rdkafka#consumer-1] [thrd:main]:  Member "rdkafka-21c6f4d7-bf44-4ffa-a085-9b3ec699fa62" with 0 owned partition(s) and 1 subscribed topic(s):
ASSIGN [rdkafka#consumer-1] [thrd:main]:   mytopic [-1]
ASSIGN [rdkafka#consumer-1] [thrd:main]: range: Topic mytopic with 1 partition(s) and 2 subscribing member(s)
ASSIGN [rdkafka#consumer-1] [thrd:main]: range: Member "rdkafka-21c6f4d7-bf44-4ffa-a085-9b3ec699fa62": assigned topic mytopic partitions 0..0
ASSIGN [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2" range assignment for 2 member(s) finished in 0.011ms:
ASSIGN [rdkafka#consumer-1] [thrd:main]:  Member "rdkafka-371db227-3de7-4034-b464-3ada2ca625b5" (me) assigned 0 partition(s):
ASSIGN [rdkafka#consumer-1] [thrd:main]:  Member "rdkafka-21c6f4d7-bf44-4ffa-a085-9b3ec699fa62" assigned 1 partition(s):
ASSIGN [rdkafka#consumer-1] [thrd:main]:   mytopic [0]
ASSIGNOR [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2": "range" assignor run for 2 member(s)
CGRPJOINSTATE [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2" changed join state wait-metadata -> wait-sync (state up)
SEND [rdkafka#consumer-1] [thrd:GroupCoordinator]: GroupCoordinator/0: Sent SyncGroupRequest (v3, 254 bytes @ 0, CorrId 17)
BROADCAST [rdkafka#consumer-1] [thrd:GroupCoordinator]: Broadcasting state change
RECV [rdkafka#consumer-1] [thrd:GroupCoordinator]: GroupCoordinator/0: Received SyncGroupResponse (v3, 20 bytes, CorrId 17, rtt 1.03ms)
SYNCGROUP [rdkafka#consumer-1] [thrd:main]: SyncGroup response: Success (10 bytes of MemberState data)
ASSIGNMENT [rdkafka#consumer-1] [thrd:main]: List with 0 partition(s):
CGRPJOINSTATE [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2" changed join state wait-sync -> wait-assign-call (state up)
ASSIGN [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2": delegating assign of 0 partition(s) to application on queue rd_kafka_cgrp_new: new assignment
ASSIGNMENT [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2": setting group assignment to 0 partition(s)
GRPASSIGNMENT [rdkafka#consumer-1] [thrd:main]: List with 0 partition(s):
CGRPOP [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2" received op GET_REBALANCE_PROTOCOL in state up (join-state wait-assign-call)
CGRPOP [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2" received op ASSIGN in state up (join-state wait-assign-call)
ASSIGN [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2": new assignment of 0 partition(s) in join-state wait-assign-call
CLEARASSIGN [rdkafka#consumer-1] [thrd:main]: No current assignment to clear
ASSIGNMENT [rdkafka#consumer-1] [thrd:main]: Added 0 partition(s) to assignment which now consists of 0 partition(s) where of 0 are in pending state and 0 are being queried
CGRPJOINSTATE [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2" changed join state wait-assign-call -> steady (state up)
DUMP [rdkafka#consumer-1] [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
DUMP_ALL [rdkafka#consumer-1] [thrd:main]: List with 0 partition(s):
ASSIGNDONE [rdkafka#consumer-1] [thrd:main]: Group "py_client_tt2": assignment operations done in join-state steady (rebalance rejoin=false)
RECV [rdkafka#consumer-1] [thrd:10.31.51.112:9092/bootstrap]: 10.31.51.112:9092/0: Received FetchResponse (v11, 76 bytes, CorrId 24, rtt 500.36ms)
FETCH [rdkafka#consumer-1] [thrd:10.31.51.112:9092/bootstrap]: 10.31.51.112:9092/0: Topic mytopic [0] MessageSet size 0, error "Success", MaxOffset 255, LSO 255, Ver 4/4
FETCH [rdkafka#consumer-1] [thrd:10.31.51.112:9092/bootstrap]: 10.31.51.112:9092/0: Topic mytopic [0] in state stopped at offset 255 (1/100000 msgs, 0/65536 kb queued, opv 4) is not fetchable: not in active fetch state
FETCHADD [rdkafka#consumer-1] [thrd:10.31.51.112:9092/bootstrap]: 10.31.51.112:9092/0: Removed mytopic [0] from fetch list (0 entries, opv 4): not in active fetch state

How to reproduce

Checklist

Please provide the following information:

  • [ ] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • [ ] Apache Kafka broker version:
  • [ ] Client configuration: {...}
  • [ ] Operating system:
  • [ ] Provide client logs (with 'debug': '..' as necessary)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

helloHKTK avatar Oct 14 '23 16:10 helloHKTK

It seems that the issue might be due to the consumer offset or maybe because the consumer is way too far behind the due to over processing. Assign it to me and see if I can help

abdurafeyf avatar Oct 16 '23 06:10 abdurafeyf

It seems that the issue might be due to the consumer offset or maybe because the consumer is way too far behind the due to over processing. Assign it to me and see if I can help

Sorry, I don't seem to have the assigned permissions. I use the most basic usage to consume data in Kafka. The Kafka version I am using is 2.13, and the following is my Kafka service configuration:

broker.id=0
listeners=PLAINTEXT://10.31.51.112:9092
advertised.listeners=PLAINTEXT://10.31.51.112:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka_2.13-3.0.0/log/kafka
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=-1
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2081
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
auto.create.topics.enable=false
delete.topic.enable=true

helloHKTK avatar Oct 16 '23 07:10 helloHKTK

@helloHKTK following up on this. It would be helpful if you attached the configuration of your consumer (not your broker) so it can help with investigation. As @abdurafeyf mentioned, it is likely a consumer offset issue you are facing

nhaq-confluent avatar Feb 12 '24 19:02 nhaq-confluent

I'm also running into this. We consume events from Kafka over kubernetes (MSaaS deployment) and when HPA kicks in or a deployment happens due to change in the Rollout object wrt kubernetes etc, it seems that everything just HALTS.

>>> import confluent_kafka
>>> confluent_kafka.version()
('2.2.0', 33685504)
>>> confluent_kafka.libversion()
('2.2.0', 33685759)

PFBA my consumer config and logs are very similar to above -:

        "auto.offset.reset": "earliest",
        "bootstrap.servers": CONFIG.bootstrap_servers,
        "enable.auto.commit": True,
        "group.id": KAFKA_CONSUMER_GROUP,
        "security.protocol": CONFIG.security_protocol,
        "on_commit": commit_completed,
        "max.poll.interval.ms": 36_00_000,
        "debug": "broker,consumer,cgrp,topic,fetch",
............................

%7|1709227312.372|METADATA|rdkafka#consumer-1| [thrd:main]:   Topic <topic-name> partition 0 Leader 108 Epoch 5
%7|1709227312.372|METADATA|rdkafka#consumer-1| [thrd:main]:   Topic <topic-name> partition 1 Leader 208 Epoch 4
%7|1709227312.372|METADATA|rdkafka#consumer-1| [thrd:main]:   Topic <topic-name> partition 2 Leader 308 Epoch 5
%7|1709227312.372|METADATA|rdkafka#consumer-1| [thrd:main]:   Topic <topic-name> partition 3 Leader 109 Epoch 6
%7|1709227312.372|METADATA|rdkafka#consumer-1| [thrd:main]:   Topic <topic-name> partition 4 Leader 209 Epoch 5
%7|1709227312.372|METADATA|rdkafka#consumer-1| [thrd:main]:   Topic <topic-name> partition 5 Leader 309 Epoch 7
%7|1709227312.372|METADATA|rdkafka#consumer-1| [thrd:main]: ssl://<broker-link>: 1/1 requested topic(s) seen in metadata
%7|1709227313.719|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/308: Heartbeat for group "<consumer-group-name>" generation id 197
%7|1709227313.721|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Group "<consumer-group-name>" heartbeat error response in state up (join-state wait-unassign-call, 0 partition(s) assigned): Broker: Group rebalance in progress
%7|1709227313.748|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/308: Heartbeat for group "<consumer-group-name>" generation id 197
%7|1709227313.754|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Group "<consumer-group-name>" heartbeat error response in state up (join-state wait-unassign-call, 0 partition(s) assigned): Broker: Group rebalance in progress
%7|1709227316.719|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/308: Heartbeat for group "<consumer-group-name>" generation id 197
%7|1709227316.721|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Group "<consumer-group-name>" heartbeat error response in state up (join-state wait-unassign-call, 0 partition(s) assigned): Broker: Group rebalance in progress
%7|1709227316.748|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/308: Heartbeat for group "<consumer-group-name>" generation id 197
%7|1709227316.749|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Group "<consumer-group-name>" heartbeat error response in state up (join-state wait-unassign-call, 0 partition(s) assigned): Broker: Group rebalance in progress
%7|1709227319.719|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/308: Heartbeat for group "<consumer-group-name>" generation id 197
%7|1709227319.748|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/308: Heartbeat for group "<consumer-group-name>" generation id 197

............................

^^ This happens in an infinite loop it seems

And re-balancing kinda never completes, even for a single consumer group with 5-6 partitions wrt the given topic. cc @nhaq-confluent

AdityaSoni19031997 avatar Feb 29 '24 17:02 AdityaSoni19031997

您好:    很高兴收到您的来信,我会尽快阅读并回复您!祝您工作顺利、生活愉快!

helloHKTK avatar Feb 29 '24 17:02 helloHKTK