Open
Description
I am using Spring Boot 3.4.5 + Webflux. I am building an integration with Hibernate Reactive. I want to support the timeout from @Transactional
. The way to do that with Hibernate Reactive is to put a timeout on the flux/mono. I don't want users of my starter to have to do that manually, so I am attempting to use TransactionInterceptor
.
public class CustomReactiveTransactionInterceptor extends TransactionInterceptor {
public CustomReactiveTransactionInterceptor(
TransactionManager txManager,
TransactionAttributeSource attributeSource) {
super(txManager, attributeSource);
}
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
// Determine the raw result of the method call
Object result = invocation.proceed();
System.out.println("CustomReactiveTransactionInterceptor invoked");
Method method = invocation.getMethod();
Class<?> targetClass = AopUtils.getTargetClass(invocation.getThis());
TransactionAttribute txAttr =
getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
// If it’s a Mono, wrap it in a reactive transaction
if (result instanceof Mono<?> mono) {
return TransactionalOperator
.create((org.springframework.transaction.ReactiveTransactionManager) getTransactionManager())
.transactional(mono);
}
// If it’s a Flux, similarly wrap
if (result instanceof Flux<?> flux) {
return TransactionalOperator
.create((org.springframework.transaction.ReactiveTransactionManager) getTransactionManager())
.transactional(flux/*.timeout(Duration.ofMillis(1))*/);
}
// Otherwise, fall back to the standard (imperative) behavior
return super.invoke(invocation);
}
}
And then I register it as:
@Bean
@Primary
public CustomReactiveTransactionInterceptor txInterceptor(
ReactiveTransactionManager reactiveTxManager,
TransactionAttributeSource tas) {
return new CustomReactiveTransactionInterceptor(reactiveTxManager, tas);
}
@Bean
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
CustomReactiveTransactionInterceptor txInterceptor,
TransactionAttributeSource tas) {
var advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
advisor.setTransactionAttributeSource(tas);
advisor.setAdvice(txInterceptor);
return advisor;
}
And let's assume a controller method like this:
@Transactional(timeout = 2)
@GetMapping("1")
public Flux<Film> test1() {
return this.filmRepository
.findAll()
.thenMany(this.filmRepository.findAll(QFilm.film.title.startsWith("Ac")));
}
The issue I'm running into is that the invoke method is triggered TWICE.
- Flux.thenMany ⇢ at com.xxx.search2.test2.TestController.test1(TestController.java:38)
- Flux.contextWrite ⇢ at org.springframework.transaction.reactive.TransactionalOperatorImpl.execute(TransactionalOperatorImpl.java:85)
Is that intended? My intention is to set the timeout on flux #1, but there doesn't seem to be a way to filter out the 2nd one.
For both invocations, the txAttribute is set and the method and targetClass are the same as well.