streamz
streamz copied to clipboard
Gracefully exit python script using Streams
I have a use case associated with pulling data from a Kafka topic. I need the streamz operator exit gracefully and exit the python script once it hits an exception. It looks something like this :
source = Stream.from_kafka_batched(TOPIC, kafka_confs, poll_interval='20s', max_batch_size=10000)
def process_messages():
try:
#process_messages
except Exception as e:
print(e)
disconnect_gracefully()
def disconnect_gracefully():
logging.info("Exit gracefully")
source.stop()
source.destory()
source.map(process_messages)
While this seems to work for the streamz operator, I feel like it doesn't disconnect from the Kafka broker and I get logs like this
%6|1651194599.149|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: my-kafka-server:9093: Disconnected (after 80522ms in state UP)
So, the script doesn't exit. Any pointers to how this can be done effectively?
@chinmaychandak , @jsmaupin , @roveo - anyone still interested in smooth operation of streamz and kafka?