parallel-consumer icon indicating copy to clipboard operation
parallel-consumer copied to clipboard

Concurrent modification exception when closing

Open krvajal opened this issue 5 months ago • 4 comments

There seems to be a race condition when closing

Exception in thread "Thread-7" java.util.concurrent.ExecutionException: java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: pc-broker-poll, id: 38) otherThread(id: 27)
	at java.base/java.util.concurrent.FutureTask.report(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.get(Unknown Source)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.waitForClose(AbstractParallelEoSStreamProcessor.java:557)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:531)
	at io.confluent.parallelconsumer.internal.DrainingCloseable.closeDrainFirst(DrainingCloseable.java:40)
	at com.sinch.engage.chatlayer.ConsumerProxy.shutdown(ConsumerProxy.java:93)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: pc-broker-poll, id: 38) otherThread(id: 27)
	at java.base/java.util.concurrent.FutureTask.report(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.get(Unknown Source)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.closeAndWait(BrokerPollSystem.java:274)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:624)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$10(AbstractParallelEoSStreamProcessor.java:741)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: pc-broker-poll, id: 38) otherThread(id: 27)
	at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2551)
	at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2532)
	at org.apache.kafka.clients.consumer.KafkaConsumer.groupMetadata(KafkaConsumer.java:2324)
	at io.confluent.parallelconsumer.internal.ConsumerManager.updateMetadataCache(ConsumerManager.java:69)
	at io.confluent.parallelconsumer.internal.ConsumerManager.poll(ConsumerManager.j…

krvajal avatar Mar 01 '24 06:03 krvajal