quarkus icon indicating copy to clipboard operation
quarkus copied to clipboard

Kafka emitter and @Transactional - enlisted connection used without active transaction

Open ppal83 opened this issue 1 year ago • 5 comments

Describe the bug

I am very sorry if it's duplicated and yes, I've googled and found lots of similar issues. In one of them I found the solution we used to produce the message and block to wait, but I just can't understand why the behaviour changed after upgrading to some newer version. We've got lots of tests, everything was good, and then we got this bug again: javax.transaction.xa.XAException: Error trying to transactionCommit local transaction: Enlisted connection used without active transaction Please advice how to deal with it

Expected behavior

As we block after emitting the message inside @Transactional - we don't expect any exceptions thrown

Actual behavior

The following exception is thrown in the @Transactional method:

10:13:40 WARN  traceId=d5c04f866ee2b443785c79ed70ba08cc, parentId=e786128f31582bdc, spanId=3f490b896e3e8678, sampled=true [co.ar.at.arjuna] (executor-thread-50) ARJUNA012094: Commit of action id 0:ffff0a00cd08:a0ef:65042400:cc6 invoked while multiple threads active within it.
10:13:40 WARN  traceId=d5c04f866ee2b443785c79ed70ba08cc, parentId=e786128f31582bdc, spanId=3f490b896e3e8678, sampled=true [co.ar.at.arjuna] (executor-thread-50) ARJUNA012107: CheckedAction::check - atomic action 0:ffff0a00cd08:a0ef:65042400:cc6 commiting with 2 threads active!
10:13:40 WARN  traceId=d5c04f866ee2b443785c79ed70ba08cc, parentId=e786128f31582bdc, spanId=3f490b896e3e8678, sampled=true [co.ar.at.jta] (executor-thread-50) ARJUNA016039: onePhaseCommit on < formatId=131077, gtrid_length=35, bqual_length=36, tx_uid=0:ffff0a00cd08:a0ef:65042400:cc6, node_name=quarkus, branch_uid=0:ffff0a00cd08:a0ef:65042400:cc9, subordinatenodename=null, eis_name=0 > (io.agroal.narayana.LocalXAResource@50402e38) failed with exception XAException.XA_RBROLLBACK: javax.transaction.xa.XAException: Error trying to transactionCommit local transaction: Enlisted connection used without active transaction
	at io.agroal.narayana.LocalXAResource.xaException(LocalXAResource.java:140)
	at io.agroal.narayana.LocalXAResource.xaException(LocalXAResource.java:134)
	at io.agroal.narayana.LocalXAResource.commit(LocalXAResource.java:72)
	at com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord.topLevelOnePhaseCommit(XAResourceRecord.java:702)
	at com.arjuna.ats.arjuna.coordinator.BasicAction.onePhaseCommit(BasicAction.java:2400)
	at com.arjuna.ats.arjuna.coordinator.BasicAction.End(BasicAction.java:1502)
	at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.end(TwoPhaseCoordinator.java:96)
	at com.arjuna.ats.arjuna.AtomicAction.commit(AtomicAction.java:162)
	at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1295)
	at com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.commit(BaseTransaction.java:128)
	at io.quarkus.narayana.jta.runtime.NotifyingTransactionManager.commit(NotifyingTransactionManager.java:70)
	at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.endTransaction(TransactionalInterceptorBase.java:406)
	at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.invokeInOurTx(TransactionalInterceptorBase.java:175)
	at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.invokeInOurTx(TransactionalInterceptorBase.java:107)
	at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired.doIntercept(TransactionalInterceptorRequired.java:38)
	at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.intercept(TransactionalInterceptorBase.java:61)
	at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired.intercept(TransactionalInterceptorRequired.java:32)
	at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired_Bean.intercept(Unknown Source)
	at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:42)
	at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:30)
	at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:27)
	at com.company.merchant.app.service.MerchantService_Subclass.sendVerificationEmail(Unknown Source)
	at com.company.merchant.app.service.MerchantService_ClientProxy.sendVerificationEmail(Unknown Source)
	at com.company.merchant.boundary.api.MerchantController.sendVerificationEmail(MerchantController.java:128)
	at com.company.merchant.boundary.api.MerchantController_Subclass.sendVerificationEmail$$superforward(Unknown Source)
	at com.company.merchant.boundary.api.MerchantController_Subclass$$function$$8.apply(Unknown Source)
	at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:73)
	at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:62)
	at io.quarkus.resteasy.runtime.QuarkusRestPathTemplateInterceptor.restMethodInvoke(QuarkusRestPathTemplateInterceptor.java:39)
	at io.quarkus.resteasy.runtime.QuarkusRestPathTemplateInterceptor_Bean.intercept(Unknown Source)
	at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:42)
	at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:30)
	at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:27)
	at com.company.merchant.boundary.api.MerchantController_Subclass.sendVerificationEmail(Unknown Source)
	at com.company.merchant.boundary.api.MerchantController_ClientProxy.sendVerificationEmail(Unknown Source)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.jboss.resteasy.core.MethodInjectorImpl.invoke(MethodInjectorImpl.java:154)
	at org.jboss.resteasy.core.MethodInjectorImpl.invoke(MethodInjectorImpl.java:118)
	at org.jboss.resteasy.core.ResourceMethodInvoker.internalInvokeOnTarget(ResourceMethodInvoker.java:560)
	at org.jboss.resteasy.core.ResourceMethodInvoker.invokeOnTargetAfterFilter(ResourceMethodInvoker.java:452)
	at org.jboss.resteasy.core.ResourceMethodInvoker.lambda$invokeOnTarget$2(ResourceMethodInvoker.java:413)
	at org.jboss.resteasy.core.interception.jaxrs.PreMatchContainerRequestContext.filter(PreMatchContainerRequestContext.java:321)
	at org.jboss.resteasy.core.ResourceMethodInvoker.invokeOnTarget(ResourceMethodInvoker.java:415)
	at org.jboss.resteasy.core.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:378)
	at org.jboss.resteasy.core.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:356)
	at org.jboss.resteasy.core.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:70)
	at org.jboss.resteasy.core.SynchronousDispatcher.invoke(SynchronousDispatcher.java:429)
	at org.jboss.resteasy.core.SynchronousDispatcher.lambda$invoke$4(SynchronousDispatcher.java:240)
	at org.jboss.resteasy.core.SynchronousDispatcher.lambda$preprocess$0(SynchronousDispatcher.java:154)
	at org.jboss.resteasy.core.interception.jaxrs.PreMatchContainerRequestContext.filter(PreMatchContainerRequestContext.java:321)
	at org.jboss.resteasy.core.SynchronousDispatcher.preprocess(SynchronousDispatcher.java:157)
	at org.jboss.resteasy.core.SynchronousDispatcher.invoke(SynchronousDispatcher.java:229)
	at io.quarkus.resteasy.runtime.standalone.RequestDispatcher.service(RequestDispatcher.java:82)
	at io.quarkus.resteasy.runtime.standalone.VertxRequestHandler.dispatch(VertxRequestHandler.java:147)
	at io.quarkus.resteasy.runtime.standalone.VertxRequestHandler$1.run(VertxRequestHandler.java:93)
	at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:576)
	at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2513)
	at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1538)
	at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
	at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.sql.SQLException: Enlisted connection used without active transaction
	at io.agroal.pool.ConnectionHandler.verifyEnlistment(ConnectionHandler.java:398)
	at io.agroal.pool.ConnectionHandler.transactionCommit(ConnectionHandler.java:355)
	at io.agroal.narayana.LocalXAResource.commit(LocalXAResource.java:69)
	... 62 more

10:13:40 WARN  traceId=d5c04f866ee2b443785c79ed70ba08cc, parentId=e786128f31582bdc, spanId=3f490b896e3e8678, sampled=true [co.fu.mi.ex.ha.InternalServerErrorHandler] (executor-thread-50) Internal server error: : io.quarkus.arc.ArcUndeclaredThrowableException: Error invoking subclass method
	at com.company.merchant.app.service.MerchantService_Subclass.sendVerificationEmail(Unknown Source)
	at com.company.merchant.app.service.MerchantService_ClientProxy.sendVerificationEmail(Unknown Source)
	at com.company.merchant.boundary.api.MerchantController.sendVerificationEmail(MerchantController.java:128)
	at com.company.merchant.boundary.api.MerchantController_Subclass.sendVerificationEmail$$superforward(Unknown Source)
	at com.company.merchant.boundary.api.MerchantController_Subclass$$function$$8.apply(Unknown Source)
	at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:73)
	at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:62)
	at io.quarkus.resteasy.runtime.QuarkusRestPathTemplateInterceptor.restMethodInvoke(QuarkusRestPathTemplateInterceptor.java:39)
	at io.quarkus.resteasy.runtime.QuarkusRestPathTemplateInterceptor_Bean.intercept(Unknown Source)
	at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:42)
	at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:30)
	at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:27)
	at com.company.merchant.boundary.api.MerchantController_Subclass.sendVerificationEmail(Unknown Source)
	at com.company.merchant.boundary.api.MerchantController_ClientProxy.sendVerificationEmail(Unknown Source)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.jboss.resteasy.core.MethodInjectorImpl.invoke(MethodInjectorImpl.java:154)
	at org.jboss.resteasy.core.MethodInjectorImpl.invoke(MethodInjectorImpl.java:118)
	at org.jboss.resteasy.core.ResourceMethodInvoker.internalInvokeOnTarget(ResourceMethodInvoker.java:560)
	at org.jboss.resteasy.core.ResourceMethodInvoker.invokeOnTargetAfterFilter(ResourceMethodInvoker.java:452)
	at org.jboss.resteasy.core.ResourceMethodInvoker.lambda$invokeOnTarget$2(ResourceMethodInvoker.java:413)
	at org.jboss.resteasy.core.interception.jaxrs.PreMatchContainerRequestContext.filter(PreMatchContainerRequestContext.java:321)
	at org.jboss.resteasy.core.ResourceMethodInvoker.invokeOnTarget(ResourceMethodInvoker.java:415)
	at org.jboss.resteasy.core.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:378)
	at org.jboss.resteasy.core.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:356)
	at org.jboss.resteasy.core.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:70)
	at org.jboss.resteasy.core.SynchronousDispatcher.invoke(SynchronousDispatcher.java:429)
	at org.jboss.resteasy.core.SynchronousDispatcher.lambda$invoke$4(SynchronousDispatcher.java:240)
	at org.jboss.resteasy.core.SynchronousDispatcher.lambda$preprocess$0(SynchronousDispatcher.java:154)
	at org.jboss.resteasy.core.interception.jaxrs.PreMatchContainerRequestContext.filter(PreMatchContainerRequestContext.java:321)
	at org.jboss.resteasy.core.SynchronousDispatcher.preprocess(SynchronousDispatcher.java:157)
	at org.jboss.resteasy.core.SynchronousDispatcher.invoke(SynchronousDispatcher.java:229)
	at io.quarkus.resteasy.runtime.standalone.RequestDispatcher.service(RequestDispatcher.java:82)
	at io.quarkus.resteasy.runtime.standalone.VertxRequestHandler.dispatch(VertxRequestHandler.java:147)
	at io.quarkus.resteasy.runtime.standalone.VertxRequestHandler$1.run(VertxRequestHandler.java:93)
	at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:576)
	at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2513)
	at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1538)
	at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
	at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: jakarta.transaction.RollbackException: ARJUNA016053: Could not commit transaction.
	at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1307)
	at com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.commit(BaseTransaction.java:128)
	at io.quarkus.narayana.jta.runtime.NotifyingTransactionManager.commit(NotifyingTransactionManager.java:70)
	at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.endTransaction(TransactionalInterceptorBase.java:406)
	at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.invokeInOurTx(TransactionalInterceptorBase.java:175)
	at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.invokeInOurTx(TransactionalInterceptorBase.java:107)
	at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired.doIntercept(TransactionalInterceptorRequired.java:38)
	at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.intercept(TransactionalInterceptorBase.java:61)
	at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired.intercept(TransactionalInterceptorRequired.java:32)
	at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired_Bean.intercept(Unknown Source)
	at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:42)
	at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:30)
	at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:27)
	... 44 more
	Suppressed: javax.transaction.xa.XAException: Error trying to transactionCommit local transaction: Enlisted connection used without active transaction
		at io.agroal.narayana.LocalXAResource.xaException(LocalXAResource.java:140)
		at io.agroal.narayana.LocalXAResource.xaException(LocalXAResource.java:134)
		at io.agroal.narayana.LocalXAResource.commit(LocalXAResource.java:72)
		at com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord.topLevelOnePhaseCommit(XAResourceRecord.java:702)
		at com.arjuna.ats.arjuna.coordinator.BasicAction.onePhaseCommit(BasicAction.java:2400)
		at com.arjuna.ats.arjuna.coordinator.BasicAction.End(BasicAction.java:1502)
		at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.end(TwoPhaseCoordinator.java:96)
		at com.arjuna.ats.arjuna.AtomicAction.commit(AtomicAction.java:162)
		at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1295)
		... 56 more
	Caused by: java.sql.SQLException: Enlisted connection used without active transaction
		at io.agroal.pool.ConnectionHandler.verifyEnlistment(ConnectionHandler.java:398)
		at io.agroal.pool.ConnectionHandler.transactionCommit(ConnectionHandler.java:355)
		at io.agroal.narayana.LocalXAResource.commit(LocalXAResource.java:69)
		... 62 more

How to Reproduce?

`@Slf4j @Getter @RequestScoped @Path("/api/v1/merchants") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) public class MerchantController {

@Inject
MerchantService service;

// removed code

@PUT
@Path("{id}/sendVerificationEmail")
public void sendVerificationEmail(@PathParam("id") UUID id) {
    service.sendVerificationEmail(id);
}

}

@Slf4j @Getter @ApplicationScoped public class MerchantService {

@Inject
MerchantRepository repository;

@Inject
KafkaConfigurationChangeService configurationChangeService;

@Transactional
public void sendVerificationEmail(UUID id) {
    repository.update("verifyEmailSent = true where id = ?1", id);
    configurationChangeService.produceConfigurationChange(id, ConfigurationChangeType.VERIFICATION_EMAIL_SENT);
}

}

@ApplicationScoped public class KafkaConfigurationChangeService {

@Inject
@Channel("configuration-change")
Emitter<KafkaConfigurationChangeDto> emitter;

public void produceConfigurationChange(UUID id, ConfigurationChangeType configurationChangeType) {
    final var kafkaConfigurationChangeDto = KafkaConfigurationChangeDto.builder()
            .merchantId(id)
            .configurationChangeType(configurationChangeType)
            .build();

    produce(kafkaConfigurationChangeDto);
}

private void produce(KafkaConfigurationChangeDto dto) {
    emitter.send(dto).toCompletableFuture().join();
}

}`

Output of uname -a or ver

Linux merchant-5445774485-46n69 5.10.186-179.751.amzn2.x86_64 #1 SMP Tue Aug 1 20:51:38 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux

Output of java -version

17.0.6

GraalVM version (if different from Java)

No response

Quarkus version or git rev

3.2.2.Final

Build tool (ie. output of mvnw --version or gradlew --version)

Gradle 8.3

Additional information

No response

ppal83 avatar Sep 18 '23 18:09 ppal83

/cc @alesj (kafka), @cescoffier (kafka), @ozangunalp (kafka)

quarkus-bot[bot] avatar Sep 18 '23 18:09 quarkus-bot[bot]

Can you create a reproducer project? The last time I checked the example in the Kafka guide worked: https://quarkus.io/guides/kafka#writing-entities-managed-by-hibernate-to-kafka

ozangunalp avatar Sep 21 '23 11:09 ozangunalp

Hi. I've created the reproducer. https://github.com/ppal83/quarkus-kafka-transactional-reproducer

ppal83 avatar Oct 24 '23 20:10 ppal83

Thank you for the reproducer! This is effectively the same as #21948.

I am not sure why this happens but I think when the context propagation propagates the Tx context it touches it with another thread which the JTA doesn't like. The workaround/fix would be to disallow propagation of the TX context in your produce method.

If you annotate the KafkaTestProducer#produce with @CurrentThreadContext(cleared = { ThreadContext.TRANSACTION }) the problem goes away. Another interesting thing is the problem only occurs using persistAndFlush and update query, and not persist.

I reckon this is not restricted to Kafka producer but any async action. I am not sure how to counteract this, @manovotn @mkouba @cescoffier any ideas?

ozangunalp avatar Oct 26 '23 12:10 ozangunalp

I wanted to share that I also ran into this problem, but I wasn't using a persistAndFlush or update query. I was using delete from PanacheRepositoryBase.

cmouwer avatar Feb 13 '24 16:02 cmouwer