spring-framework icon indicating copy to clipboard operation
spring-framework copied to clipboard

Custom Webflux TransactionInterceptor gets invoked twice

Open SledgeHammer01 opened this issue 6 months ago • 0 comments

I am using Spring Boot 3.4.5 + Webflux. I am building an integration with Hibernate Reactive. I want to support the timeout from @Transactional. The way to do that with Hibernate Reactive is to put a timeout on the flux/mono. I don't want users of my starter to have to do that manually, so I am attempting to use TransactionInterceptor.

public class CustomReactiveTransactionInterceptor extends TransactionInterceptor {

  public CustomReactiveTransactionInterceptor(
      TransactionManager txManager,
      TransactionAttributeSource attributeSource) {

    super(txManager, attributeSource);
  }

  @Override
  public Object invoke(MethodInvocation invocation) throws Throwable {
    // Determine the raw result of the method call
    Object result = invocation.proceed();

    System.out.println("CustomReactiveTransactionInterceptor invoked");

    Method method = invocation.getMethod();
    Class<?> targetClass = AopUtils.getTargetClass(invocation.getThis());

    TransactionAttribute txAttr =
        getTransactionAttributeSource().getTransactionAttribute(method, targetClass);

    // If it’s a Mono, wrap it in a reactive transaction
    if (result instanceof Mono<?> mono) {
      return TransactionalOperator
          .create((org.springframework.transaction.ReactiveTransactionManager) getTransactionManager())
          .transactional(mono);
    }

    // If it’s a Flux, similarly wrap
    if (result instanceof Flux<?> flux) {
      return TransactionalOperator
          .create((org.springframework.transaction.ReactiveTransactionManager) getTransactionManager())
          .transactional(flux/*.timeout(Duration.ofMillis(1))*/);
    }

    // Otherwise, fall back to the standard (imperative) behavior
    return super.invoke(invocation);
  }
}

And then I register it as:

  @Bean
  @Primary
  public CustomReactiveTransactionInterceptor txInterceptor(
      ReactiveTransactionManager reactiveTxManager,
      TransactionAttributeSource tas) {

    return new CustomReactiveTransactionInterceptor(reactiveTxManager, tas);
  }

  @Bean
  public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
      CustomReactiveTransactionInterceptor txInterceptor,
      TransactionAttributeSource tas) {

    var advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
    advisor.setTransactionAttributeSource(tas);
    advisor.setAdvice(txInterceptor);
    return advisor;
  }

And let's assume a controller method like this:

  @Transactional(timeout = 2)
  @GetMapping("1")
  public Flux<Film> test1() {
    return this.filmRepository
        .findAll()
        .thenMany(this.filmRepository.findAll(QFilm.film.title.startsWith("Ac")));
  }

The issue I'm running into is that the invoke method is triggered TWICE.

  1. Flux.thenMany ⇢ at com.xxx.search2.test2.TestController.test1(TestController.java:38)
  2. Flux.contextWrite ⇢ at org.springframework.transaction.reactive.TransactionalOperatorImpl.execute(TransactionalOperatorImpl.java:85)

Is that intended? My intention is to set the timeout on flux #1, but there doesn't seem to be a way to filter out the 2nd one.

For both invocations, the txAttribute is set and the method and targetClass are the same as well.

SledgeHammer01 avatar May 14 '25 18:05 SledgeHammer01