eventmesh icon indicating copy to clipboard operation
eventmesh copied to clipboard

Error while unsubscribing the Kafka consumer: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread

Open GaoService opened this issue 3 months ago • 1 comments

Search before asking

  • [x] I had searched in the issues and found no similar issues.

Environment

Linux

EventMesh version

master

What happened

v1.11.0 eventMesh.storage.plugin.type插件为kafka,在EventMeshTCPClient<EventMeshMessage> eventMeshClient.close() 消费客户端关闭时,报kafka异常

How to reproduce

@PostConstruct public void startListener() { try { UserAgent userAgent = MessageUtils.generateSubClient(EventMeshUtils.buildUserAgent(eventMeshConfig, Integer.parseInt(RandomUtil.randomNumbers(5)), eventMeshConfig.getClient().getConsumerGroup())); EventMeshTCPClientConfig config = EventMeshTCPClientConfig.builder() .host(eventMeshConfig.getServer().getHost()) .port(eventMeshConfig.getServer().getTcpPort()) .userAgent(userAgent) .build(); eventMeshClient = EventMeshTCPClientFactory.createEventMeshTCPClient(config, EventMeshMessage.class); eventMeshClient.init(); // 为每个主题单独订阅 List<String> topicList = Arrays.asList(eventMeshConfig.getClient().getTopics().split(",")); for (String topic : topicList) { String trimmedTopic = topic.trim(); if (!trimmedTopic.isEmpty()) { eventMeshClient.subscribe(trimmedTopic, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC); log.info("Subscribed to topic: {}", trimmedTopic); } } eventMeshClient.registerSubBusiHandler(this::processMessage); eventMeshClient.listen(); } catch (Exception e) { log.error("Failed to start EventMesh TCP listener", e); } }

@PreDestroy public void stopListener() { try { if (eventMeshClient != null) { try { // 再关闭客户端 eventMeshClient.close(); log.info("EventMesh TCP Listener stopped successfully"); } catch (Exception e) { log.warn("Error during close: {}", e.getMessage()); } } } catch (Exception e) { log.error("Error stopping EventMesh TCP listener", e); } }

关闭客户端时报异常

Debug logs

2025-10-11 17:15:52,628 ERROR [eventMesh-tcp-worker-2] ConsumerImpl(ConsumerImpl.java:126) - Error while unsubscribing the Kafka consumer:
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: eventMesh-tcp-worker-2, id: 50) otherThread(id: 55)
        at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.acquire(ClassicKafkaConsumer.java:1232) ~[kafka-clients-3.9.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.acquireAndEnsureOpen(ClassicKafkaConsumer.java:1213) ~[kafka-clients-3.9.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.unsubscribe(ClassicKafkaConsumer.java:544) ~[kafka-clients-3.9.0.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:764) ~[kafka-clients-3.9.0.jar:?]
        at org.apache.eventmesh.storage.kafka.consumer.ConsumerImpl.unsubscribe(ConsumerImpl.java:121) [eventmesh-storage-kafka-1.11.0-release.jar:1.11.0-release]
        at org.apache.eventmesh.storage.kafka.consumer.KafkaConsumerImpl.unsubscribe(KafkaConsumerImpl.java:81) [eventmesh-storage-kafka-1.11.0-release.jar:1.11.0-release]
        at org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper.unsubscribe(MQConsumerWrapper.java:50) [eventmesh-runtime-1.11.0-release.jar:1.11.0-release]
        at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper.unsubscribe(ClientGroupWrapper.java:623) [eventmesh-runtime-1.11.0-release.jar:1.11.0-r
elease]
        at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.cleanSubscriptionInSession(ClientSessionGroupMapping.java:304) [eventmesh-runtim
e-1.11.0-release.jar:1.11.0-release]
        at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.cleanClientGroupWrapperByCloseSub(ClientSessionGroupMapping.java:281) [eventmesh
-runtime-1.11.0-release.jar:1.11.0-release]
        at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.closeSession(ClientSessionGroupMapping.java:175) [eventmesh-runtime-1.11.0-relea
se.jar:1.11.0-release]
        at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.closeSession(ClientSessionGroupMapping.java:144) [eventmesh-runtime-1.11.0-relea
se.jar:1.11.0-release]
        at org.apache.eventmesh.runtime.boot.AbstractTCPServer$TcpConnectionHandler.channelInactive(AbstractTCPServer.java:436) [eventmesh-runtime-1.11.0-release.jar:1.11.0-rele
ase]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:412) [netty-codec-4.1.112.Final.jar:4.1.112.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:377) [netty-codec-4.1.112.Final.jar:4.1.112.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
        at io.netty.channel.AbstractChannelHandlerContext.access$300(AbstractChannelHandlerContext.java:61) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
        at io.netty.channel.AbstractChannelHandlerContext$4.run(AbstractChannelHandlerContext.java:286) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
        at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) [netty-common-4.1.112.Final.jar:4.1.112.Final]
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) [netty-common-4.1.112.Final.jar:4.1.112.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) [netty-common-4.1.112.Final.jar:4.1.112.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) [netty-common-4.1.112.Final.jar:4.1.112.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.112.Final.jar:4.1.112.Final]
        at java.base/java.lang.Thread.run(Thread.java:834) [?:?]

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

GaoService avatar Oct 11 '25 11:10 GaoService

Welcome to the Apache EventMesh community!! We are glad that you are contributing by opening this issue. :D

Please make sure to include all the relevant context. We will be here shortly.

If you are interested in contributing to our project, please let us know! You can check out our contributing guide on contributing to EventMesh.

Want to get closer to the community?

WeChat Assistant WeChat Public Account Slack
Join Slack Chat

Mailing Lists:

Name Description Subscribe Unsubscribe Archive
Users User support and questions mailing list Subscribe Unsubscribe Mail Archives
Development Development related discussions Subscribe Unsubscribe Mail Archives
Commits All commits to repositories Subscribe Unsubscribe Mail Archives
Issues Issues or PRs comments and reviews Subscribe Unsubscribe Mail Archives

github-actions[bot] avatar Oct 11 '25 11:10 github-actions[bot]