rocketmq-spring icon indicating copy to clipboard operation
rocketmq-spring copied to clipboard

checkExecutor in RocketMQTransaction run in wrong oder.

Open caolicaoli opened this issue 2 years ago • 1 comments

  1. Please describe the issue you observed:
  • using transaction message.

  • checkLocalTransaction use my threadpool

  • now it only use the pool it created itslef with 1 max poolsize

i think these two method run in wrong order。。。。。

public void initTransactionEnv() {
    TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
    if (producer.getExecutorService() != null) {
        this.checkExecutor = producer.getExecutorService();
    } else {
        this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
        this.checkExecutor = new ThreadPoolExecutor(
            producer.getCheckThreadPoolMinSize(),
            producer.getCheckThreadPoolMaxSize(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.checkRequestQueue);
    }
}

private void registerTransactionListener(String beanName, Object bean) { Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);

    if (!RocketMQLocalTransactionListener.class.isAssignableFrom(bean.getClass())) {
        throw new IllegalStateException(clazz + " is not instance of " + RocketMQLocalTransactionListener.class.getName());
    }
    RocketMQTransactionListener annotation = clazz.getAnnotation(RocketMQTransactionListener.class);
    RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) applicationContext.getBean(annotation.rocketMQTemplateBeanName());
    if (((TransactionMQProducer) rocketMQTemplate.getProducer()).getTransactionListener() != null) {
        throw new IllegalStateException(annotation.rocketMQTemplateBeanName() + " already exists RocketMQLocalTransactionListener");
    }
    ((TransactionMQProducer) rocketMQTemplate.getProducer()).setExecutorService(new ThreadPoolExecutor(annotation.corePoolSize(), annotation.maximumPoolSize(),
        annotation.keepAliveTime(), annotation.keepAliveTimeUnit(), new LinkedBlockingDeque<>(annotation.blockingQueueSize())));
    ((TransactionMQProducer) rocketMQTemplate.getProducer()).setTransactionListener(RocketMQUtil.convert((RocketMQLocalTransactionListener) bean));
    log.debug("RocketMQLocalTransactionListener {} register to {} success", clazz.getName(), annotation.rocketMQTemplateBeanName());
}

caolicaoli avatar Jan 15 '23 02:01 caolicaoli

可以补充下嘛,是说initTransactionEnv()方法和registerTransactionListener()方法的执行顺序出错了嘛, 在哪儿执行顺序不对呢?

francisoliverlee avatar Feb 01 '23 12:02 francisoliverlee