rocketmq
rocketmq copied to clipboard
consumeThreadMax is useless in ConsumeMessageConcurrentlyService
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
}
See the code above, the consumeExecutor use a default capacity(Int.MaxValue) LinkedBlockingQueue.If there is more than getConsumeThreadMin thread is working ,then new task will put in the queue, will never create new thread to process new task.
so, what is your opinion?
so, what is your opinion?
use one property like 'consumeThreadNum' instead of the consumeThreadMin and consumeThreadMax
@coder-zzzz Could you submit a PR to fix it? I think consumeThreadNum
can be added , consumeThreadMin
consumeThreadMax
can be left for compatibility and be declared as deprecated.