rocketmq-flink
rocketmq-flink copied to clipboard
The message queue is not in assigned list
I got the error when i consume rocketmq message in flink job:
17:51:50.537 [rmq-pull-thread-1] ERROR org.apache.flink.connector.rocketmq.legacy.common.util.RetryUtil - RuntimeException, retry 2/5
java.lang.RuntimeException: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, may be rebalancing, message queue: MessageQueue [topic=yumiao1205, brokerName=broker-a, queueId=1]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:348)
at org.apache.flink.connector.rocketmq.legacy.common.util.RetryUtil.call(RetryUtil.java:56)
at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$run$2(RocketMQSourceFunction.java:276)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, may be rebalancing, message queue: MessageQueue [topic=yumiao1205, brokerName=broker-a, queueId=1]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seek(DefaultLitePullConsumerImpl.java:658)
at org.apache.rocketmq.client.consumer.DefaultLitePullConsumer.seek(DefaultLitePullConsumer.java:298)
at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:282)
... 5 common frames omitted
17:51:51.342 [rmq-pull-thread-2] ERROR org.apache.flink.connector.rocketmq.legacy.common.util.RetryUtil - RuntimeException, retry 3/5
java.lang.RuntimeException: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, may be rebalancing, message queue: MessageQueue [topic=yumiao1205, brokerName=broker-a, queueId=2]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:348)
at org.apache.flink.connector.rocketmq.legacy.common.util.RetryUtil.call(RetryUtil.java:56)
at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$run$2(RocketMQSourceFunction.java:276)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, may be rebalancing, message queue: MessageQueue [topic=yumiao1205, brokerName=broker-a, queueId=2]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seek(DefaultLitePullConsumerImpl.java:658)
at org.apache.rocketmq.client.consumer.DefaultLitePullConsumer.seek(DefaultLitePullConsumer.java:298)
at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:282)
... 5 common frames omitted
My rocketmq version: 5.1.1 flink version: 1.15.0
I also encounter this error, it's seem that still not solved
See https://github.com/apache/rocketmq-flink/pull/96
I am still have this error
while i run the simpleConsumer it's working normally, it's mean can connect rocketmq?
it's still failed while I run the ConnectorExample RocketMQ: 5.0.0
2024-03-12 23:05:42 WARN [ Source: Custom Source (1/2)#0] [e.flink.runtime.taskmanager.Task] Source: Custom Source (1/2)#0 (ea15ccd58a1ef50baebef0860ee2e52b) switched from INITIALIZING to FAILED with failure cause: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, message queue: MessageQueue [topic=test, brokerName=broker-a, queueId=1]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seek(DefaultLitePullConsumerImpl.java:660)
at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seekToEnd(DefaultLitePullConsumerImpl.java:693)
at org.apache.rocketmq.client.consumer.DefaultLitePullConsumer.seekToEnd(DefaultLitePullConsumer.java:394)
at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.initOffsets(RocketMQSourceFunction.java:394)
at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.open(RocketMQSourceFunction.java:246)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:750)
See #96 hi, good morning , this is my setting
I have also encountered this problem. Has it been resolved
See #96
this is my test code
See #96 It seems do nothing!
it seens do nothing! this brantch 'main-latest' still exist! who can help fix this problem RocketMQ Version:4.9.2 Flink Version: 1.15.2 problem details: Caused by: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, message queue: MessageQueue [topic=tp_ld_driver_behavior_score_change, brokerName=broker-onlinemq1-d, queueId=0] For more information, please visit the url, http://rocketmq.apache.org/docs/faq/ I guess this issue comes from the class 'DefaultLitePullConsumer',Previously, it was DefaultMQPullConsumer,This class will be removed in 2022, and a better implementation {@link DefaultLitePullConsumer} is recommend to use
Is this bug fixed? I have encountered this issue and found the messagequeue shoud be assigned. But I cant find any assined code.
The following code is runnable if assign the queue to consumer manually:
`
Collection<MessageQueue> totalQueues = consumer.fetchMessageQueues(topic);
consumer.assign(totalQueues);
//it works
consumer.seekToEnd(totalQueues.iterator().next()); `