rocketmq icon indicating copy to clipboard operation
rocketmq copied to clipboard

[ISSUE #3359] Add function--Custom delay time

Open dragonTalon opened this issue 3 years ago • 13 comments

Make sure set the target branch to develop

What is the purpose of the change

#3359

增加了一个自定义延迟消息的实现,通过message 的property中的属性来实现

Added the realization of a custom delayed message, which is realized through the properties in the property of the message

Brief changelog

add property name :SPECIFY_DELAY_TIME

this property value format:()d()h()m()s 格式为: ()d()h()m()s 例: 1d1h1m1s 表示为 延迟 1天加1小时加1分钟加1秒 Example: 1d1h1m1s is expressed as delay 1 day plus 1 hour plus 1 minute plus 1 second

Verifying this change

add new schedule class and check new property info 这是我的验证结果文档

验证方式:(test way)

this demo code:

生产者(production)

   DefaultMQProducer producer = new DefaultMQProducer("DELAY_GROUP_TAG");
    
   producer.setNamesrvAddr("127.0.0.1:9876");

    producer.start();

    final byte[] bytes = "this is cutomer 2m2s message" .getBytes(StandardCharsets.UTF_8);

    Message message = new Message("DELAY_TOPIC_TAG_1", "*", bytes);

    //这是我实现延迟队列用到的

    message.putUserProperty("SPECIFY_DELAY_TIME", "2m2s");

    producer.sendOneway(message);

    producer.shutdown();

消费者:(consume)

    public static void main(String[] args) throws MQClientException {

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DELAY_GROUP_TAG");

    consumer.setNamesrvAddr("127.0.0.1:9876");

    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

    //Topic的名称

    consumer.subscribe("DELAY_TOPIC_TAG_1", "*");

    consumer.setSuspendCurrentQueueTimeMillis(30);

    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

            for (MessageExt ext : list) {

                final long time = System.currentTimeMillis() - ext.getBornTimestamp();

                System.out.println("消息\t" + new String(ext.getBody(), StandardCharsets.UTF_8) + "接收到的消息间隔消息\t" + time / 1000.0 );

            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    });

    consumer.start();
}

dragonTalon avatar Sep 09 '21 10:09 dragonTalon

Greate. it is better to change this PR to develop branch.

ShannonDing avatar Sep 10 '21 07:09 ShannonDing

Could you please to write a issue firstly, you can refer other issues' formate! @dragonTalon

zongtanghu avatar Sep 16 '21 11:09 zongtanghu

Could you please to write a issue firstly, you can refer other issues' formate! @dragonTalon

ok,i change my pr add issue;You can take a look

dragonTalon avatar Sep 16 '21 14:09 dragonTalon

Okay!Nice!

原始邮件 发件人: @.> 收件人: @.> 抄送: Hu @.>; @.> 发送时间: 2021年9月16日(周四) 22:03 主题: Re: [apache/rocketmq] [ISSUE #3359] Add function--Custom delay time(#3337)

Could you please to write a issue firstly, you can refer other issues' formate! @dragonTalon ok,i change my pr add issue;You can take a look — You are receiving this because you commented. Reply to this email directly, view it on GitHub, or unsubscribe. Triage notifications on the go with GitHub Mobile for iOS or Android.

zongtanghu avatar Sep 16 '21 14:09 zongtanghu

I hope this feature would launch soon.

yourlin avatar Sep 30 '21 02:09 yourlin

It's better to compare this with the solution in #2290 to see in what situation this approach fits better.

Jason918 avatar Oct 02 '21 03:10 Jason918

大哥,这个功能是否已在新的版本中实现了?可否具体说一下是那个版本.

sugniii avatar Oct 15 '21 02:10 sugniii

大哥,这个功能是否已在新的版本中实现了?可否具体说一下是那个版本.

这个PR还没合并进去呢

Jason918 avatar Oct 15 '21 04:10 Jason918

大哥,这个功能是否已经在新版本中实现了?可否具体说一下那个版本。

这个PR没有合并进去呢

有打算合并嘛?或者有打算提供这个功能嘛

sugniii avatar Oct 15 '21 04:10 sugniii

@Jason918 there any problem with this pr merger?

dragonTalon avatar Nov 01 '21 13:11 dragonTalon

@Jason918 check my branch,please

dragonTalon avatar Dec 02 '21 09:12 dragonTalon

@dragonTalon Thanks for your contribution. I think this solution introduce too many write.

areyouok avatar Dec 03 '21 11:12 areyouok

Thanks for your contribution. I think this solution introduce too many write.

yes,i use many consumerQueue save task. I use to a method of Layering time slicing. If you do not perform slicing but repeat in a consumerQueue, there may be errors if there are more delayed tasks.

dragonTalon avatar Dec 06 '21 14:12 dragonTalon

This PR is stale because it has been open for 365 days with no activity. It will be closed in 3 days if no further activity occurs. If you wish not to mark it as stale, please leave a comment in this PR.

github-actions[bot] avatar Jul 19 '23 00:07 github-actions[bot]

This PR was closed because it has been inactive for 3 days since being marked as stale.

github-actions[bot] avatar Jul 24 '23 00:07 github-actions[bot]