rocketmq-spring
rocketmq-spring copied to clipboard
Consumer batch messages API.
请问怎么批量的接收消息,比如按照文档接收消息是这样的 @Slf4j @Service @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2") public class MyConsumer implements RocketMQListener<OrderPaidEvent> { @Override public void onMessage(OrderPaidEvent orderPaidEvent) { log.info("received orderPaidEvent: {}", orderPaidEvent); } } 我现在想把里面的OrderPaidEvent orderPaidEvent变成List<OrderPaidEvent> orderPaidEvent这样的,我测试了一下这样会报错,原生的是支持批量的,现在我看了一遍源码没有找到批量接收的方法
It is not supported yet.
为什么在 2.1.0 里面又删除了呢?
是这样的我们这里也有批量消费的需求,主要是想结合ORM的批量提交增加性能 ,由于能力有限,,能想到的方式很不优雅,support native listener #293 ,然后在native listener自己处理批量。希望rocketmq-spring能提供批量消费这个功能。
We also meet the same problem, this feature is important for us to batch save data to database.
yes,we need batch consumer .
Currently, I found a workaround to get batch messages. But we still perfer batch listener
@RocketMQMessageListener(topic = "demo_product", consumerGroup = "demo_batch_product_consumerGroup", consumeMode = ConsumeMode.ORDERLY)
public class ProductBatchConsumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void onMessage(String message) {
// do noting, have to implement RocketMQListener or throw error
}
private AtomicInteger atomicInteger = new AtomicInteger();
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setPullInterval(1000);
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
consumer.setConsumeMessageBatchMaxSize(1000);
consumer.setPullBatchSize(100);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.println("batchSize: " + msgs.size());
int result = atomicInteger.addAndGet(msgs.size());
System.out.println("totalSize: " + result);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
}
}
@RongtongJin Any update about this issue ? I found an PR #170 about batch listener
we need batch consumer .
It is not supported yet.
why?
使用pull模式的List<String> messages = rocketMQTemplate.receive(String.class)是否就可以获取批量数据?
we need batch consumer
rocketMQTemplate.receive 后,是否需要手工调用下面的代码来提交ack呢?
rocketMQTemplate.getConsumer().commitSync();
DefaultRocketMQListenerContainer
这个地方能否直接支持把原始的List<MessageExt> msgs传递下去呢,很多场景都需要批量的数据处理提升性能