confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

consumer heartbeat error response in state up

Open jedi201321 opened this issue 3 years ago • 5 comments

Description

I have a topic with 24 partitions which is produced about 1000 messages/sec. The cosumer-commit-offset is manual.

1.After starting consumer, the Broker logs : "Preparing to rebalance group····· Stabilized group ····· Assignment received from leader for group ·····" about every 2 minutes all the time.

  1. The debug log of consumer is: %7|1650878105.949|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic speech-xxx [21]: stored offset 3888754141, committed offset 3888751506: setting st ored offset 3888754141 for commit %7|1650878105.949|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic speech-xxx [22]: stored offset -1001, committed offset -1001: not including in com mit %7|1650878105.949|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic speech-xxx [23]: stored offset -1001, committed offset -1001: not including in com mit %7|1650878105.949|COMMIT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: Committing offsets for 14 partition(s): manual %7|1650878105.949|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: Enqueue OffsetCommitRequest(v1, 14/24 partition(s))): manual %5|1650878105.949|REQTMOUT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Timed out HeartbeatRequest in flight (after 10326ms, timeout #0): possibly held back by preceeding OffsetCommitRequest with timeout in 46873ms %7|1650878105.949|REQERR|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: HeartbeatRequest failed: Local: Timed out: actions Retry %7|1650878105.949|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Group "anti-xxx" heartbeat error response in state up (join state started, 24 partition(s) assigned): Local: Timed out %7|1650878105.949|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Heartbeat failed: _TIMED_OUT: Local: Timed out %7|1650878105.949|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "anti-xxx" is rebalancing in state up (join-state started) with assignment: Local: Timed out %7|1650878105.949|PAUSE|rdkafka#consumer-1| [thrd:main]: Library pausing 24 partition(s) %7|1650878105.949|BARRIER|rdkafka#consumer-1| [thrd:main]: speech-room-audiofile [0]: rd_kafka_toppar_op_pause_resume:2311: new version barrier v3 %7|1650878105.949|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause speech-xxx [0] (v3) %7|1650878105.949|BARRIER|rdkafka#consumer-1| [thrd:main]: speech-xxx [1]: rd_kafka_toppar_op_pause_resume:2311: new version barrier v3 %7|1650878105.949|PAUSE|rdkafka#consumer-1| [thrd:main]: Pausespeech-xxx [1] (v3) %7|1650878105.949|BARRIER|rdkafka#consumer-1| [thrd:main]: speech-xxx [2]: rd_kafka_toppar_op_pause_resume:2311: new version barrier v3 %7|1650878105.949|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause speech-xxx [2] (v3) %7|1650878105.949|BARRIER|rdkafka#consumer-1| [thrd:main]: speech-xxx [3]: rd_kafka_toppar_op_pause_resume:2311: new version barrier v3 %7|1650878105.949|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause speech-xxx [3] (v3) %7|1650878105.949|BARRIER|rdkafka#consumer-1| [thrd:main]: speech-room-audiofile [4]: rd_kafka_toppar_op_pause_resume:2311: new version barrier v3 %7|1650878105.949|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause speech-room-audiofile [4] (v3) %7|1650878105.949|BARRIER|rdkafka#consumer-1| [thrd:main]: speech-room-audiofile [5]: rd_kafka_toppar_op_pause_resume:2311: new version barrier v3 %4|1650878105.949|REQTMOUT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests

Also, this group-id has lag as 100,167,242 . Restart this consumer program has no help. so,is there some reasons for this ?

How to reproduce

the codes of consumer is below:

while 1: 

            try: 
                msg = kafka_client.poll(timeout=1.0) 
                if not msg:
                    continue
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        time.sleep(0.1)
                        continue
                    else:
                        print("Consumer error: {}".format(msg.error()))
                else:
                    queue.put(msg.value())   # multiprocessing.Queue
                    count += 1
                    if count > 100:
                        kafka_client.commit()
                        count = 0

Checklist

Please provide the following information:

  • [x] confluent-kafka-python and librdkafka version (('1.0.0', 1048576)and ('1.0.0', 16777471)):
  • [x] Apache Kafka broker version: 0.11.0.2
  • [x] Client configuration: { "auto.offset.reset": "latest", "fetch.message.max.bytes": "10485760", "enable.auto.commit": "false",}
  • [x] Operating system: centos 7.4 with 3.10.0 kernel , python version: 2.7.12
  • [x] Provide client logs (with 'debug': 'all' as necessary)
  • [x] Provide broker log excerpts
  • [ ] Critical issue

jedi201321 avatar Apr 26 '22 04:04 jedi201321

suggest upgrading the client. after a quick look, i believe this might be a head of line blocking issue that was resolved after v1.0

mhowlett avatar Apr 26 '22 14:04 mhowlett

Thanks a million. I add "time.sleep(0.5)" after "kafka_client.commit()" in the code. After restarting the consumer , it seems to be well. So, Is it actually caused by head of line blocking?

jedi201321 avatar Apr 26 '22 14:04 jedi201321

time.sleep(0.5) should not change behavior. did you upgrade to the latest client version?

mhowlett avatar May 06 '22 19:05 mhowlett

The version of PYTHON is 2.7.12, so I upgrade the client to 1.6.0. But, the problem still exists.

jedi201321 avatar May 07 '22 10:05 jedi201321

In addition,I find the broker always has many establieshed connections with large value of "Recv-Q" . image

jedi201321 avatar May 12 '22 07:05 jedi201321

closing - we can investigate further if the issue presents in the latest version.

mhowlett avatar Oct 25 '22 18:10 mhowlett