parallel-consumer
parallel-consumer copied to clipboard
Error on closing Parallel Consumer
When trying to close parallel Consumer using parallelConsumer.close(), Getting below error:
[INFO ] 2022-10-20 23:03:00.661 [Thread-3] communication.kafka.KafkaConnector - Closing ParallelConsumer for url: localhost:9092, topic: MY-TOPIC-1
[ERROR] 2022-10-20 23:03:30.662 [pc-control] io.confluent.parallelconsumer.internal.BrokerPollSystem - **Execution or timeout exception waiting for broker poller thread to finish**
java.util.concurrent.TimeoutException: null
at java.util.concurrent.FutureTask.get(FutureTask.java:205) ~[?:1.8.0_252]
at io.confluent.parallelconsumer.internal.BrokerPollSystem.closeAndWait(BrokerPollSystem.java:244) ~[service-1.0.0.jar:?]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:515) ~[service-1.0.0.jar:?]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.controlLoop(AbstractParallelEoSStreamProcessor.java:698) ~[service-1.0.0.jar:?]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:639) ~[service-1.0.0.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_252]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
[ERROR] 2022-10-20 23:03:30.676 [pc-control] io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor - **Error from poll control thread, will attempt controlled shutdown, then rethrow. Error: null**
java.util.concurrent.TimeoutException: null
at java.util.concurrent.FutureTask.get(FutureTask.java:205) ~[?:1.8.0_252]
at io.confluent.parallelconsumer.internal.BrokerPollSystem.closeAndWait(BrokerPollSystem.java:244) ~[service-1.0.0.jar:?]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:515) ~[service-1.0.0.jar:?]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.controlLoop(AbstractParallelEoSStreamProcessor.java:698) ~[service-1.0.0.jar:?]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:639) ~[service-1.0.0.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_252]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
[ERROR] 2022-10-20 23:03:30.682 [Thread-3] io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor - **Execution or timeout exception while waiting for the control thread to close cleanly (state was closing). Try increasing your time-out to allow the system to drain, or close without draining.**
java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Can't commit - not running (state: closed
at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_252]
at java.util.concurrent.FutureTask.get(FutureTask.java:206) ~[?:1.8.0_252]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.waitForClose(AbstractParallelEoSStreamProcessor.java:464) ~[service-1.0.0.jar:?]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:446) ~[service-1.0.0.jar:?]
at io.confluent.parallelconsumer.internal.DrainingCloseable.closeDontDrainFirst(DrainingCloseable.java:60) ~[service-1.0.0.jar:?]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:424) ~[service-1.0.0.jar:?]
at communication.kafka.KafkaConnector.stop(KafkaConnector.java:935) ~[service-1.0.0.jar:?]
at communication.kafka.KafkaClient.unsubscribe(KafkaClient.java:74) ~[service-1.0.0.jar:?]
Caused by: java.lang.IllegalStateException: Can't commit - not running (state: closed
at io.confluent.parallelconsumer.internal.BrokerPollSystem.retrieveOffsetsAndCommit(BrokerPollSystem.java:315) ~[service-1.0.0.jar:?]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.commitOffsetsThatAreReady(AbstractParallelEoSStreamProcessor.java:1089) ~[service-1.0.0.jar:?]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:511) ~[service-1.0.0.jar:?]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:642) ~[service-1.0.0.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_252]
... 1 more
[ERROR] 2022-10-20 23:03:30.684 [Thread-3] communication.kafka.KafkaConnector - **Error closing parallelConsumer
java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Can't commit - not running (state: closed**
at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_252]
at java.util.concurrent.FutureTask.get(FutureTask.java:206) ~[?:1.8.0_252]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.waitForClose(AbstractParallelEoSStreamProcessor.java:464) ~[service-1.0.0.jar:?]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:446) ~[service-1.0.0.jar:?]
at io.confluent.parallelconsumer.internal.DrainingCloseable.closeDontDrainFirst(DrainingCloseable.java:60) ~[service-1.0.0.jar:?]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:424) ~[service-1.0.0.jar:?]
at communication.kafka.KafkaConnector.stop(KafkaConnector.java:935) ~[service-1.0.0.jar:?]
at communication.kafka.KafkaClient.unsubscribe(KafkaClient.java:74) ~[service-1.0.0.jar:?]
at BaseremoveKafkaListener(Basejava:133) ~[service-1.0.0.jar:?]
at group.handler.MicroServiceHandler.removeAllGroups(MicroServiceHandler.java:92) ~[service-1.0.0.jar:?]
at group.handler.MicroServiceHandler.doHandle(MicroServiceHandler.java:66) ~[service-1.0.0.jar:?]
at group.handler.MicroServiceHandler.doHandle(MicroServiceHandler.java:19) ~[service-1.0.0.jar:?]
at communication.message.handler.MessageHandler.handle(MessageHandler.java:28) ~[service-1.0.0.jar:?]
at communication.message.processor.MessageProcessor.process(MessageProcessor.java:17) ~[service-1.0.0.jar:?]
at communication.kafka.KafkaClient$KafkaProcessingInterfaceImpl.process(KafkaClient.java:89) ~[service-1.0.0.jar:?]
at communication.kafka.KafkaMessageProcessingTask.run(KafkaMessageProcessingTask.java:26) ~[service-1.0.0.jar:?]
at communication.kafka.KafkaConnector$3.run(KafkaConnector.java:463) ~[service-1.0.0.jar:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
Caused by: java.lang.IllegalStateException: Can't commit - not running (state: closed
at io.confluent.parallelconsumer.internal.BrokerPollSystem.retrieveOffsetsAndCommit(BrokerPollSystem.java:315) ~[service-1.0.0.jar:?]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.commitOffsetsThatAreReady(AbstractParallelEoSStreamProcessor.java:1089) ~[service-1.0.0.jar:?]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:511) ~[service-1.0.0.jar:?]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:642) ~[service-1.0.0.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_252]
... 1 more
What version?
What version?
0.5.2.1
@astubbs Also we see multiple different types of errors due to which parallelconsumer shutdown happens, but our java process still keeps running. Is there any way through which we can shutdown out process, maybe we can get event from some kind of shutdown listener on parallelConsumer?
Yes, you can periodically call the isClosedOrFailed method.
A shutdown listener is an interesting suggestion - feel free to create a feature request. There isn't anything like that atm.
Please update to the latest version, then try again?
Please update to the latest version, then try again?
Sure will try to update to 0.5.2.3 Also will try working on isClosedOrFailed periodic checks
We're also about to release a metrics package which might be relevant - so stay tuned on:
- #27
We're also about to release a metrics package which might be relevant - so stay tuned on:
sure
Yes, you can periodically call the
isClosedOrFailedmethod.
Just need to confirm one thing is this method only available in ParallelEoSStreamProcessor(AbstractParallelEoSStreamProcessor) and not at interface level ParallelConsumer or ParallelStreamProcessor? If yes then we will have to cast it to ParallelEoSStreamProcessor
Ah yea - will have to fix that
An you include earlier logs? The errors you showed were the errors from trying to shutdown when the client was already closed. I can add some checks to avoid those.