streamz icon indicating copy to clipboard operation
streamz copied to clipboard

Gracefully exit python script using Streams

Open arjun180 opened this issue 2 years ago • 1 comments

I have a use case associated with pulling data from a Kafka topic. I need the streamz operator exit gracefully and exit the python script once it hits an exception. It looks something like this :

source = Stream.from_kafka_batched(TOPIC, kafka_confs, poll_interval='20s', max_batch_size=10000)

def process_messages():
    try:
         #process_messages
   except Exception as e:
        print(e)
        disconnect_gracefully()

def disconnect_gracefully():
    logging.info("Exit gracefully")
    source.stop()
    source.destory()
    
source.map(process_messages)

While this seems to work for the streamz operator, I feel like it doesn't disconnect from the Kafka broker and I get logs like this

%6|1651194599.149|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: my-kafka-server:9093: Disconnected (after 80522ms in state UP)

So, the script doesn't exit. Any pointers to how this can be done effectively?

arjun180 avatar Apr 29 '22 19:04 arjun180

@chinmaychandak , @jsmaupin , @roveo - anyone still interested in smooth operation of streamz and kafka?

martindurant avatar Apr 29 '22 19:04 martindurant