parallel-consumer icon indicating copy to clipboard operation
parallel-consumer copied to clipboard

Error on closing Parallel Consumer

Open jatindersthind opened this issue 2 years ago • 8 comments

When trying to close parallel Consumer using parallelConsumer.close(), Getting below error:

[INFO ] 2022-10-20 23:03:00.661 [Thread-3] communication.kafka.KafkaConnector - Closing ParallelConsumer for url: localhost:9092, topic: MY-TOPIC-1
[ERROR] 2022-10-20 23:03:30.662 [pc-control] io.confluent.parallelconsumer.internal.BrokerPollSystem - **Execution or timeout exception waiting for broker poller thread to finish**
java.util.concurrent.TimeoutException: null
	at java.util.concurrent.FutureTask.get(FutureTask.java:205) ~[?:1.8.0_252]
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.closeAndWait(BrokerPollSystem.java:244) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:515) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.controlLoop(AbstractParallelEoSStreamProcessor.java:698) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:639) ~[service-1.0.0.jar:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_252]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]


[ERROR] 2022-10-20 23:03:30.676 [pc-control] io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor - **Error from poll control thread, will attempt controlled shutdown, then rethrow. Error: null**
java.util.concurrent.TimeoutException: null
	at java.util.concurrent.FutureTask.get(FutureTask.java:205) ~[?:1.8.0_252]
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.closeAndWait(BrokerPollSystem.java:244) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:515) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.controlLoop(AbstractParallelEoSStreamProcessor.java:698) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:639) ~[service-1.0.0.jar:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_252]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]


[ERROR] 2022-10-20 23:03:30.682 [Thread-3] io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor - **Execution or timeout exception while waiting for the control thread to close cleanly (state was closing). Try increasing your time-out to allow the system to drain, or close without draining.**
java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Can't commit - not running (state: closed
	at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_252]
	at java.util.concurrent.FutureTask.get(FutureTask.java:206) ~[?:1.8.0_252]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.waitForClose(AbstractParallelEoSStreamProcessor.java:464) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:446) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.DrainingCloseable.closeDontDrainFirst(DrainingCloseable.java:60) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:424) ~[service-1.0.0.jar:?]
	at communication.kafka.KafkaConnector.stop(KafkaConnector.java:935) ~[service-1.0.0.jar:?]
	at communication.kafka.KafkaClient.unsubscribe(KafkaClient.java:74) ~[service-1.0.0.jar:?]
Caused by: java.lang.IllegalStateException: Can't commit - not running (state: closed
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.retrieveOffsetsAndCommit(BrokerPollSystem.java:315) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.commitOffsetsThatAreReady(AbstractParallelEoSStreamProcessor.java:1089) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:511) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:642) ~[service-1.0.0.jar:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_252]
	... 1 more


[ERROR] 2022-10-20 23:03:30.684 [Thread-3] communication.kafka.KafkaConnector - **Error closing parallelConsumer
java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Can't commit - not running (state: closed**
	at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_252]
	at java.util.concurrent.FutureTask.get(FutureTask.java:206) ~[?:1.8.0_252]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.waitForClose(AbstractParallelEoSStreamProcessor.java:464) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:446) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.DrainingCloseable.closeDontDrainFirst(DrainingCloseable.java:60) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:424) ~[service-1.0.0.jar:?]
	at communication.kafka.KafkaConnector.stop(KafkaConnector.java:935) ~[service-1.0.0.jar:?]
	at communication.kafka.KafkaClient.unsubscribe(KafkaClient.java:74) ~[service-1.0.0.jar:?]
	at BaseremoveKafkaListener(Basejava:133) ~[service-1.0.0.jar:?]
	at group.handler.MicroServiceHandler.removeAllGroups(MicroServiceHandler.java:92) ~[service-1.0.0.jar:?]
	at group.handler.MicroServiceHandler.doHandle(MicroServiceHandler.java:66) ~[service-1.0.0.jar:?]
	at group.handler.MicroServiceHandler.doHandle(MicroServiceHandler.java:19) ~[service-1.0.0.jar:?]
	at communication.message.handler.MessageHandler.handle(MessageHandler.java:28) ~[service-1.0.0.jar:?]
	at communication.message.processor.MessageProcessor.process(MessageProcessor.java:17) ~[service-1.0.0.jar:?]
	at communication.kafka.KafkaClient$KafkaProcessingInterfaceImpl.process(KafkaClient.java:89) ~[service-1.0.0.jar:?]
	at communication.kafka.KafkaMessageProcessingTask.run(KafkaMessageProcessingTask.java:26) ~[service-1.0.0.jar:?]
	at communication.kafka.KafkaConnector$3.run(KafkaConnector.java:463) ~[service-1.0.0.jar:?]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
Caused by: java.lang.IllegalStateException: Can't commit - not running (state: closed
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.retrieveOffsetsAndCommit(BrokerPollSystem.java:315) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.commitOffsetsThatAreReady(AbstractParallelEoSStreamProcessor.java:1089) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:511) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:642) ~[service-1.0.0.jar:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_252]
	... 1 more

jatindersthind avatar Oct 27 '22 17:10 jatindersthind

What version?

astubbs avatar Nov 01 '22 09:11 astubbs

What version?

0.5.2.1

@astubbs Also we see multiple different types of errors due to which parallelconsumer shutdown happens, but our java process still keeps running. Is there any way through which we can shutdown out process, maybe we can get event from some kind of shutdown listener on parallelConsumer?

jatindersthind avatar Nov 02 '22 10:11 jatindersthind

Yes, you can periodically call the isClosedOrFailed method.

A shutdown listener is an interesting suggestion - feel free to create a feature request. There isn't anything like that atm.

Please update to the latest version, then try again?

astubbs avatar Nov 02 '22 11:11 astubbs

Please update to the latest version, then try again?

Sure will try to update to 0.5.2.3 Also will try working on isClosedOrFailed periodic checks

jatindersthind avatar Nov 03 '22 09:11 jatindersthind

We're also about to release a metrics package which might be relevant - so stay tuned on:

  • #27

astubbs avatar Nov 03 '22 11:11 astubbs

We're also about to release a metrics package which might be relevant - so stay tuned on:

sure

Yes, you can periodically call the isClosedOrFailed method.

Just need to confirm one thing is this method only available in ParallelEoSStreamProcessor(AbstractParallelEoSStreamProcessor) and not at interface level ParallelConsumer or ParallelStreamProcessor? If yes then we will have to cast it to ParallelEoSStreamProcessor

jatindersthind avatar Nov 03 '22 17:11 jatindersthind

Ah yea - will have to fix that

astubbs avatar Nov 04 '22 11:11 astubbs

An you include earlier logs? The errors you showed were the errors from trying to shutdown when the client was already closed. I can add some checks to avoid those.

astubbs avatar Nov 09 '22 19:11 astubbs