confluent-kafka-python
confluent-kafka-python copied to clipboard
consumer hangs on consumer.close()
Description
Issue with termination of kafka consumer. When consumer has being cunsuming messages for over a number of hours, it will fail to fully terminate and hang. consumer.close() is issued, and the process starts, but fails to fully complete, and hangs forever. Even signal.alarm does not terminate the script. Python script running in Alpine container.
librdkafka version 2.2.0 python:3.9-alpine
## python script
consumer_config = {'bootstrap.servers': brokers,'group.id': group_id,'auto.offset.reset': 'latest', 'sasl.username': user, \
'sasl.password': password, 'security.protocol': secprotocol ,'sasl.mechanism': saslmech, \
'ssl.ca.location': caCertificate, 'debug': 'generic, consumer, cgrp', 'internal.termination.signal': 50}
logging.info('Connect to kafka brokers {} With group-id {}'.format(brokers, group_id))
consumer = Consumer(consumer_config)
logging.info('Subscribe to topics: {}'.format(topics))
consumer.subscribe([topics])
#start of main script
logging.info('Start polling for messages')
while run:
try:
# read single message at a time
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else: #process message
logging.info('MSG:: %% %s [%d] at offset %d with key %s' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
logging.debug('Received kafka message: {}'.format(str(msg.value())[:300]))
if msg.value() is None: # Check for Null message
logging.info('Null message Ignore {}'.format(str(msg)))
else: # Process message
logging.debug('Received message: {}'.format(str(msg.value().decode('utf-8'))[:400]))
telemetry_msg = msg.value()
processMessage(telemetry_msg,configuration,rulesJSON,devices_databases)
if not run: # Break with SIGHUP
logging.critical('Received SIGHUP breaking from kafka messaging loop')
break
except Exception as e:
logging.critical('Error in main loop: ' + str(e))
run = False # Break running loop
## Final part of python script with termination
def timeout_handler(signum, frame):
logging.critical("Received SIGALRM")
raise Exception
signal.signal(signal.SIGALRM, timeout_handler) #signal timeout call for function
signal.alarm(20) #20 seconds to terminate program
logging.critical('Stopping all writebatch objects')
for deviceName in devices_databases:
devices_databases[deviceName].stop()
logging.critical('Close kafka consumer')
consumer.close()
logging.critical('Program terminated for restart')
``
### Final logs when script starts to terminate. But never completes
%7|1698828670.537|TERMINATE|rdkafka#consumer-1| [thrd:app]: Sending TERMINATE to internal main thread
%7|1698828670.537|TERMINATE|rdkafka#consumer-1| [thrd:app]: Sending thread kill signal 50
%7|1698828670.537|TERMINATE|rdkafka#consumer-1| [thrd:app]: Joining internal main thread
%7|1698828670.537|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1698828670.537|TERMINATE|rdkafka#consumer-1| [thrd:main]: Internal main thread terminating
%7|1698828670.537|DESTROY|rdkafka#consumer-1| [thrd:main]: Destroy internal
%7|1698828670.537|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1698828670.537|DESTROY|rdkafka#consumer-1| [thrd:main]: Removing all topics
%7|1698828670.538|TERMINATE|rdkafka#consumer-1| [thrd:main]: Purging reply queue
%7|1698828670.538|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1698828670.538|BROADCAST|rdkafka#consumer-1| [thrd:sasl_plaintext://10.2.80.139:9094/1]: Broadcasting state change
%7|1698828670.538|BROADCAST|rdkafka#consumer-1| [thrd:sasl_plaintext://10.2.80.139:9094/1]: Broadcasting state change
%7|1698828670.538|TERMINATE|rdkafka#consumer-1| [thrd:main]: Decommissioning internal broker
%7|1698828670.538|BROADCAST|rdkafka#consumer-1| [thrd:sasl_plaintext://10.2.80.138:9094/0]: Broadcasting state change
%7|1698828670.538|BROADCAST|rdkafka#consumer-1| [thrd:sasl_plaintext://10.2.80.138:9094/0]: Broadcasting state change
%7|1698828670.538|BROADCAST|rdkafka#consumer-1| [thrd:sasl_plaintext://10.2.80.138:9094/0]: Broadcasting state change
%7|1698828670.538|TERMINATE|rdkafka#consumer-1| [thrd:main]: Join 6 broker thread(s)
%7|1698828670.538|BROADCAST|rdkafka#consumer-1| [thrd:sasl_plaintext://10.2.80.140:9094/2]: Broadcasting state change
%7|1698828670.538|BROADCAST|rdkafka#consumer-1| [thrd::0/internal]: Broadcasting state change
%7|1698828670.538|BROADCAST|rdkafka#consumer-1| [thrd:sasl_plaintext://10.2.80.140:9094/2]: Broadcasting state change
%7|1698828670.540|BROADCAST|rdkafka#consumer-1| [thrd:sasl_plaintext://10.2.80.136:9094/bootstrap]: Broadcasting state change
How to reproduce
================
Checklist
=========
Please provide the following information:
- [ ] confluent-kafka-python and librdkafka version (`confluent_kafka.version()` and `confluent_kafka.libversion()`):
- [ ] Apache Kafka broker version:
- [ ] Client configuration: `{...}`
- [ ] Operating system:
- [ ] Provide client logs (with `'debug': '..'` as necessary)
- [ ] Provide broker log excerpts
- [ ] Critical issue
Experimenting with the closing the consumer, what I see missing in the debug logs for a hung consumer.close() pasted below. When the problem happens the Internal main thread termination does not complete.
%7|1698511119.679|TERMINATE|rdkafka#consumer-1| [thrd:main]: Internal main thread termination done %7|1698511119.679|TERMINATE|rdkafka#consumer-1| [thrd:app]: Destroying op queues %7|1698511119.679|TERMINATE|rdkafka#consumer-1| [thrd:app]: Destroying cgrp %7|1698511119.679|MEMBERID|rdkafka#consumer-1| [thrd:app]: Group "device-group-Cisco-hardware-ingest1": updating member id "" -> "(not-set)" %7|1698511119.679|TERMINATE|rdkafka#consumer-1| [thrd:app]: Termination done: freeing resources 2023-10-28 16:38:39,679 CRITICAL Program terminated for restart
Was there any progress resolving this issue?
My temp solution is to call consumer.close() from a python Thread with daemon=true and have a timeout on the join call.
Was there any progress resolving this issue?
My temp solution is to call
consumer.close()from a pythonThreadwithdaemon=trueand have a timeout on thejoincall.
Quick answer, no. It was discovered that the kafka broker cluster has stability issues and was crashing, this participated the termination above which would never complete. The kafka cluster issue was resolved, and this python consumer issue did not reoccur. But that is not the resolution, I know the issue can return under those circumstances. Therefore porting the code to golnag at present.