rocketmq
rocketmq copied to clipboard
[ISSUE #3359] Add function--Custom delay time
Make sure set the target branch to develop
What is the purpose of the change
增加了一个自定义延迟消息的实现,通过message 的property中的属性来实现
Added the realization of a custom delayed message, which is realized through the properties in the property of the message
Brief changelog
add property name :SPECIFY_DELAY_TIME
this property value format:()d()h()m()s 格式为: ()d()h()m()s 例: 1d1h1m1s 表示为 延迟 1天加1小时加1分钟加1秒 Example: 1d1h1m1s is expressed as delay 1 day plus 1 hour plus 1 minute plus 1 second
Verifying this change
add new schedule class and check new property info 这是我的验证结果文档
验证方式:(test way)
this demo code:
生产者(production)
DefaultMQProducer producer = new DefaultMQProducer("DELAY_GROUP_TAG");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
final byte[] bytes = "this is cutomer 2m2s message" .getBytes(StandardCharsets.UTF_8);
Message message = new Message("DELAY_TOPIC_TAG_1", "*", bytes);
//这是我实现延迟队列用到的
message.putUserProperty("SPECIFY_DELAY_TIME", "2m2s");
producer.sendOneway(message);
producer.shutdown();
消费者:(consume)
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DELAY_GROUP_TAG");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//Topic的名称
consumer.subscribe("DELAY_TOPIC_TAG_1", "*");
consumer.setSuspendCurrentQueueTimeMillis(30);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt ext : list) {
final long time = System.currentTimeMillis() - ext.getBornTimestamp();
System.out.println("消息\t" + new String(ext.getBody(), StandardCharsets.UTF_8) + "接收到的消息间隔消息\t" + time / 1000.0 );
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
Greate. it is better to change this PR to develop branch.
Could you please to write a issue firstly, you can refer other issues' formate! @dragonTalon
Could you please to write a issue firstly, you can refer other issues' formate! @dragonTalon
ok,i change my pr add issue;You can take a look
Okay!Nice!
原始邮件 发件人: @.> 收件人: @.> 抄送: Hu @.>; @.> 发送时间: 2021年9月16日(周四) 22:03 主题: Re: [apache/rocketmq] [ISSUE #3359] Add function--Custom delay time(#3337)
Could you please to write a issue firstly, you can refer other issues' formate! @dragonTalon ok,i change my pr add issue;You can take a look — You are receiving this because you commented. Reply to this email directly, view it on GitHub, or unsubscribe. Triage notifications on the go with GitHub Mobile for iOS or Android.
I hope this feature would launch soon.
It's better to compare this with the solution in #2290 to see in what situation this approach fits better.
大哥,这个功能是否已在新的版本中实现了?可否具体说一下是那个版本.
大哥,这个功能是否已在新的版本中实现了?可否具体说一下是那个版本.
这个PR还没合并进去呢
大哥,这个功能是否已经在新版本中实现了?可否具体说一下那个版本。
这个PR没有合并进去呢
有打算合并嘛?或者有打算提供这个功能嘛
@Jason918 there any problem with this pr merger?
@Jason918 check my branch,please
@dragonTalon Thanks for your contribution. I think this solution introduce too many write.
Thanks for your contribution. I think this solution introduce too many write.
yes,i use many consumerQueue save task. I use to a method of Layering time slicing. If you do not perform slicing but repeat in a consumerQueue, there may be errors if there are more delayed tasks.
This PR is stale because it has been open for 365 days with no activity. It will be closed in 3 days if no further activity occurs. If you wish not to mark it as stale, please leave a comment in this PR.
This PR was closed because it has been inactive for 3 days since being marked as stale.