Error while unsubscribing the Kafka consumer: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread
Search before asking
- [x] I had searched in the issues and found no similar issues.
Environment
Linux
EventMesh version
master
What happened
v1.11.0 eventMesh.storage.plugin.type插件为kafka,在EventMeshTCPClient<EventMeshMessage> eventMeshClient.close() 消费客户端关闭时,报kafka异常
How to reproduce
@PostConstruct public void startListener() { try { UserAgent userAgent = MessageUtils.generateSubClient(EventMeshUtils.buildUserAgent(eventMeshConfig, Integer.parseInt(RandomUtil.randomNumbers(5)), eventMeshConfig.getClient().getConsumerGroup())); EventMeshTCPClientConfig config = EventMeshTCPClientConfig.builder() .host(eventMeshConfig.getServer().getHost()) .port(eventMeshConfig.getServer().getTcpPort()) .userAgent(userAgent) .build(); eventMeshClient = EventMeshTCPClientFactory.createEventMeshTCPClient(config, EventMeshMessage.class); eventMeshClient.init(); // 为每个主题单独订阅 List<String> topicList = Arrays.asList(eventMeshConfig.getClient().getTopics().split(",")); for (String topic : topicList) { String trimmedTopic = topic.trim(); if (!trimmedTopic.isEmpty()) { eventMeshClient.subscribe(trimmedTopic, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC); log.info("Subscribed to topic: {}", trimmedTopic); } } eventMeshClient.registerSubBusiHandler(this::processMessage); eventMeshClient.listen(); } catch (Exception e) { log.error("Failed to start EventMesh TCP listener", e); } }
@PreDestroy public void stopListener() { try { if (eventMeshClient != null) { try { // 再关闭客户端 eventMeshClient.close(); log.info("EventMesh TCP Listener stopped successfully"); } catch (Exception e) { log.warn("Error during close: {}", e.getMessage()); } } } catch (Exception e) { log.error("Error stopping EventMesh TCP listener", e); } }
关闭客户端时报异常
Debug logs
2025-10-11 17:15:52,628 ERROR [eventMesh-tcp-worker-2] ConsumerImpl(ConsumerImpl.java:126) - Error while unsubscribing the Kafka consumer:
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: eventMesh-tcp-worker-2, id: 50) otherThread(id: 55)
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.acquire(ClassicKafkaConsumer.java:1232) ~[kafka-clients-3.9.0.jar:?]
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.acquireAndEnsureOpen(ClassicKafkaConsumer.java:1213) ~[kafka-clients-3.9.0.jar:?]
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.unsubscribe(ClassicKafkaConsumer.java:544) ~[kafka-clients-3.9.0.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:764) ~[kafka-clients-3.9.0.jar:?]
at org.apache.eventmesh.storage.kafka.consumer.ConsumerImpl.unsubscribe(ConsumerImpl.java:121) [eventmesh-storage-kafka-1.11.0-release.jar:1.11.0-release]
at org.apache.eventmesh.storage.kafka.consumer.KafkaConsumerImpl.unsubscribe(KafkaConsumerImpl.java:81) [eventmesh-storage-kafka-1.11.0-release.jar:1.11.0-release]
at org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper.unsubscribe(MQConsumerWrapper.java:50) [eventmesh-runtime-1.11.0-release.jar:1.11.0-release]
at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper.unsubscribe(ClientGroupWrapper.java:623) [eventmesh-runtime-1.11.0-release.jar:1.11.0-r
elease]
at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.cleanSubscriptionInSession(ClientSessionGroupMapping.java:304) [eventmesh-runtim
e-1.11.0-release.jar:1.11.0-release]
at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.cleanClientGroupWrapperByCloseSub(ClientSessionGroupMapping.java:281) [eventmesh
-runtime-1.11.0-release.jar:1.11.0-release]
at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.closeSession(ClientSessionGroupMapping.java:175) [eventmesh-runtime-1.11.0-relea
se.jar:1.11.0-release]
at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.closeSession(ClientSessionGroupMapping.java:144) [eventmesh-runtime-1.11.0-relea
se.jar:1.11.0-release]
at org.apache.eventmesh.runtime.boot.AbstractTCPServer$TcpConnectionHandler.channelInactive(AbstractTCPServer.java:436) [eventmesh-runtime-1.11.0-release.jar:1.11.0-rele
ase]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:412) [netty-codec-4.1.112.Final.jar:4.1.112.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:377) [netty-codec-4.1.112.Final.jar:4.1.112.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
at io.netty.channel.AbstractChannelHandlerContext.access$300(AbstractChannelHandlerContext.java:61) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
at io.netty.channel.AbstractChannelHandlerContext$4.run(AbstractChannelHandlerContext.java:286) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) [netty-common-4.1.112.Final.jar:4.1.112.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) [netty-common-4.1.112.Final.jar:4.1.112.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) [netty-common-4.1.112.Final.jar:4.1.112.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) [netty-common-4.1.112.Final.jar:4.1.112.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.112.Final.jar:4.1.112.Final]
at java.base/java.lang.Thread.run(Thread.java:834) [?:?]
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [x] I agree to follow this project's Code of Conduct *
Welcome to the Apache EventMesh community!! We are glad that you are contributing by opening this issue. :D
Please make sure to include all the relevant context. We will be here shortly.
If you are interested in contributing to our project, please let us know! You can check out our contributing guide on contributing to EventMesh.
Want to get closer to the community?
| WeChat Assistant | WeChat Public Account | Slack |
|---|---|---|
![]() |
![]() |
Join Slack Chat |
Mailing Lists:
| Name | Description | Subscribe | Unsubscribe | Archive |
|---|---|---|---|---|
| Users | User support and questions mailing list | Subscribe | Unsubscribe | Mail Archives |
| Development | Development related discussions | Subscribe | Unsubscribe | Mail Archives |
| Commits | All commits to repositories | Subscribe | Unsubscribe | Mail Archives |
| Issues | Issues or PRs comments and reviews | Subscribe | Unsubscribe | Mail Archives |

