rocketmq-python
rocketmq-python copied to clipboard
消费失败重试机制不起作用
trafficstars
def test_push_consumer_reconsume_later():
stop_event = threading.Event()
raised_exc = threading.Event()
def on_message(msg):
print(msg.body.decode('utf-8'))
if not raised_exc.is_set():
raised_exc.set()
raise Exception('Should reconsume later')
stop_event.set()
consumer = PushConsumer('xxxx')
consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
# consumer.set_namesrv_addr('127.0.0.1:9887')
consumer.set_session_credentials('LTAI0Dl0VYPDiExz', 'KOFlHDMPlVboJPbBcy07wkGO6rRyeB',
'ALIYUN') # No need to call this function if you don't use Aliyun.
consumer.subscribe('xxxxx', on_message, 'hello')
consumer.start()
while not stop_event.is_set():
time.sleep(10)
test_push_consumer_reconsume_later()
我按照你的代码这样测了,但是抛了异常的那条消息并不会再来第二次,按理说应该会再来第二次然后停止阻塞终止程序的对吧?
是的,CI 上面用的本地的 RocketMQ 实例测试是通过的。
是的,CI 上面用的本地的 RocketMQ 实例测试是通过的。
那为什么我在跑这个的时候并没有重新消费?我贴的代码应该没有问题
你用的是阿里云的 MQ? 建议去看看后台配置啥的,也可以本地弄个 RocketMQ 测试下,如果是阿里云的问题建议找阿里云技术支持解决。
你用的是阿里云的 MQ? 建议去看看后台配置啥的,也可以本地弄个 RocketMQ 测试下,如果是阿里云的问题建议找阿里云技术支持解决。
没错是rocketmq呀,问下你觉得rocketmq后台配置的哪个地方会造成这个问题?