rocketmq-flink icon indicating copy to clipboard operation
rocketmq-flink copied to clipboard

The message queue is not in assigned list

Open andyyumiao opened this issue 1 year ago • 9 comments

I got the error when i consume rocketmq message in flink job:

17:51:50.537 [rmq-pull-thread-1] ERROR org.apache.flink.connector.rocketmq.legacy.common.util.RetryUtil - RuntimeException, retry 2/5
java.lang.RuntimeException: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, may be rebalancing, message queue: MessageQueue [topic=yumiao1205, brokerName=broker-a, queueId=1]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
	at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:348)
	at org.apache.flink.connector.rocketmq.legacy.common.util.RetryUtil.call(RetryUtil.java:56)
	at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$run$2(RocketMQSourceFunction.java:276)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, may be rebalancing, message queue: MessageQueue [topic=yumiao1205, brokerName=broker-a, queueId=1]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
	at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seek(DefaultLitePullConsumerImpl.java:658)
	at org.apache.rocketmq.client.consumer.DefaultLitePullConsumer.seek(DefaultLitePullConsumer.java:298)
	at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:282)
	... 5 common frames omitted
17:51:51.342 [rmq-pull-thread-2] ERROR org.apache.flink.connector.rocketmq.legacy.common.util.RetryUtil - RuntimeException, retry 3/5
java.lang.RuntimeException: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, may be rebalancing, message queue: MessageQueue [topic=yumiao1205, brokerName=broker-a, queueId=2]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
	at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:348)
	at org.apache.flink.connector.rocketmq.legacy.common.util.RetryUtil.call(RetryUtil.java:56)
	at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$run$2(RocketMQSourceFunction.java:276)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, may be rebalancing, message queue: MessageQueue [topic=yumiao1205, brokerName=broker-a, queueId=2]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
	at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seek(DefaultLitePullConsumerImpl.java:658)
	at org.apache.rocketmq.client.consumer.DefaultLitePullConsumer.seek(DefaultLitePullConsumer.java:298)
	at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:282)
	... 5 common frames omitted

My rocketmq version: 5.1.1 flink version: 1.15.0

andyyumiao avatar Dec 28 '23 09:12 andyyumiao

I also encounter this error, it's seem that still not solved

cj495840252 avatar Mar 09 '24 06:03 cj495840252

See https://github.com/apache/rocketmq-flink/pull/96

humkum avatar Mar 12 '24 13:03 humkum

I am still have this error while i run the simpleConsumer it's working normally, it's mean can connect rocketmq? image

it's still failed while I run the ConnectorExample RocketMQ: 5.0.0

2024-03-12 23:05:42  WARN [   Source: Custom Source (1/2)#0] [e.flink.runtime.taskmanager.Task] Source: Custom Source (1/2)#0 (ea15ccd58a1ef50baebef0860ee2e52b) switched from INITIALIZING to FAILED with failure cause: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, message queue: MessageQueue [topic=test, brokerName=broker-a, queueId=1]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
	at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seek(DefaultLitePullConsumerImpl.java:660)
	at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seekToEnd(DefaultLitePullConsumerImpl.java:693)
	at org.apache.rocketmq.client.consumer.DefaultLitePullConsumer.seekToEnd(DefaultLitePullConsumer.java:394)
	at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.initOffsets(RocketMQSourceFunction.java:394)
	at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.open(RocketMQSourceFunction.java:246)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
	at java.lang.Thread.run(Thread.java:750)

cj495840252 avatar Mar 12 '24 15:03 cj495840252

See #96 hi, good morning , this is my setting image

cj495840252 avatar Mar 12 '24 15:03 cj495840252

I have also encountered this problem. Has it been resolved image

madi1819 avatar Mar 14 '24 08:03 madi1819

See #96

this is my test code image

madi1819 avatar Mar 14 '24 08:03 madi1819

See #96 It seems do nothing!

loserwang1024 avatar Mar 28 '24 10:03 loserwang1024

it seens do nothing! this brantch 'main-latest' still exist! who can help fix this problem RocketMQ Version:4.9.2 Flink Version: 1.15.2 problem details: Caused by: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, message queue: MessageQueue [topic=tp_ld_driver_behavior_score_change, brokerName=broker-onlinemq1-d, queueId=0] For more information, please visit the url, http://rocketmq.apache.org/docs/faq/ I guess this issue comes from the class 'DefaultLitePullConsumer',Previously, it was DefaultMQPullConsumer,This class will be removed in 2022, and a better implementation {@link DefaultLitePullConsumer} is recommend to use

ping-cai avatar Jun 07 '24 07:06 ping-cai

Is this bug fixed? I have encountered this issue and found the messagequeue shoud be assigned. But I cant find any assined code.

The following code is runnable if assign the queue to consumer manually:

`
Collection<MessageQueue> totalQueues = consumer.fetchMessageQueues(topic); consumer.assign(totalQueues); //it works

consumer.seekToEnd(totalQueues.iterator().next()); `

liangjw avatar Aug 18 '24 08:08 liangjw