Skip to content

Custom Webflux TransactionInterceptor gets invoked twice #34903

Open
@SledgeHammer01

Description

@SledgeHammer01

I am using Spring Boot 3.4.5 + Webflux. I am building an integration with Hibernate Reactive. I want to support the timeout from @Transactional. The way to do that with Hibernate Reactive is to put a timeout on the flux/mono. I don't want users of my starter to have to do that manually, so I am attempting to use TransactionInterceptor.

public class CustomReactiveTransactionInterceptor extends TransactionInterceptor {

  public CustomReactiveTransactionInterceptor(
      TransactionManager txManager,
      TransactionAttributeSource attributeSource) {

    super(txManager, attributeSource);
  }

  @Override
  public Object invoke(MethodInvocation invocation) throws Throwable {
    // Determine the raw result of the method call
    Object result = invocation.proceed();

    System.out.println("CustomReactiveTransactionInterceptor invoked");

    Method method = invocation.getMethod();
    Class<?> targetClass = AopUtils.getTargetClass(invocation.getThis());

    TransactionAttribute txAttr =
        getTransactionAttributeSource().getTransactionAttribute(method, targetClass);

    // If it’s a Mono, wrap it in a reactive transaction
    if (result instanceof Mono<?> mono) {
      return TransactionalOperator
          .create((org.springframework.transaction.ReactiveTransactionManager) getTransactionManager())
          .transactional(mono);
    }

    // If it’s a Flux, similarly wrap
    if (result instanceof Flux<?> flux) {
      return TransactionalOperator
          .create((org.springframework.transaction.ReactiveTransactionManager) getTransactionManager())
          .transactional(flux/*.timeout(Duration.ofMillis(1))*/);
    }

    // Otherwise, fall back to the standard (imperative) behavior
    return super.invoke(invocation);
  }
}

And then I register it as:

  @Bean
  @Primary
  public CustomReactiveTransactionInterceptor txInterceptor(
      ReactiveTransactionManager reactiveTxManager,
      TransactionAttributeSource tas) {

    return new CustomReactiveTransactionInterceptor(reactiveTxManager, tas);
  }

  @Bean
  public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
      CustomReactiveTransactionInterceptor txInterceptor,
      TransactionAttributeSource tas) {

    var advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
    advisor.setTransactionAttributeSource(tas);
    advisor.setAdvice(txInterceptor);
    return advisor;
  }

And let's assume a controller method like this:

  @Transactional(timeout = 2)
  @GetMapping("1")
  public Flux<Film> test1() {
    return this.filmRepository
        .findAll()
        .thenMany(this.filmRepository.findAll(QFilm.film.title.startsWith("Ac")));
  }

The issue I'm running into is that the invoke method is triggered TWICE.

  1. Flux.thenMany ⇢ at com.xxx.search2.test2.TestController.test1(TestController.java:38)
  2. Flux.contextWrite ⇢ at org.springframework.transaction.reactive.TransactionalOperatorImpl.execute(TransactionalOperatorImpl.java:85)

Is that intended? My intention is to set the timeout on flux #1, but there doesn't seem to be a way to filter out the 2nd one.

For both invocations, the txAttribute is set and the method and targetClass are the same as well.

Metadata

Metadata

Assignees

No one assigned

    Labels

    in: dataIssues in data modules (jdbc, orm, oxm, tx)status: waiting-for-triageAn issue we've not yet triaged or decided on

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions