spring-kafka
spring-kafka copied to clipboard
Spring Boot Kafka Issue With Transactionality When Using ReplyingKafkaTemplate
Version(s) of Spring for Apache Kafka
3.4
Description
I want to use ReplyingKafkaTemplate in order to implement Request-Reply messaging pattern with Kafka having end-to-end transactionality. I have tried many solutions and I concluded to the following code having the following issue:
Caused by: java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
With using KafkaTemplate I am able to send event to the target topic and many others with transactionality. The issues arise with ReplyingKafkaTemplate.
Reproduce
Create and send events to the topic that I want to get a synchronous reply.
Expected behavior
I want the listener to process the event and send back the reponse. I suppose that as the proper @Transactional annotations are in place there should not be and issue with transactionality.
Code
Configuration
@Bean
public ProducerFactory<String, Event> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.ACKS_CONFIG, acks);
configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);
configProps.put(ProducerConfig.RETRIES_CONFIG, retries);
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
configProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
configProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Event> kafkaTemplate(ProducerFactory<String, Event> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
@Bean
public ReplyingKafkaTemplate<String, Event, Event> replyingKafkaTemplate(ProducerFactory<String, Event> producerFactory,
ConcurrentMessageListenerContainer<String, Event> repliesContainer) {
ReplyingKafkaTemplate<String, Event, Event> replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, repliesContainer);
replyingKafkaTemplate.setSharedReplyTopic(true);
return replyingKafkaTemplate;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactory(
KafkaTemplate<String, Event> kafkaTemplate) {
Map<String, Object> consumerFactoryConfigProps = new HashMap<>();
consumerFactoryConfigProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
consumerFactoryConfigProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerFactoryConfigProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerFactoryConfigProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
ConsumerFactory<String, Event> consumerFactory = new DefaultKafkaConsumerFactory<>(
consumerFactoryConfigProps, new StringDeserializer(), new JsonDeserializer<>(Event.class));
ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory);
kafkaListenerContainerFactory.setReplyTemplate(kafkaTemplate);
return kafkaListenerContainerFactory;
}
@Bean
public ConcurrentMessageListenerContainer<String, Event> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactory,
@Value("${application.kafka.replies.topicName}") String topicName) {
Properties repliesContainerConfigProps = new Properties();
repliesContainerConfigProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
ConcurrentMessageListenerContainer<String, Event> repliesContainer =
kafkaListenerContainerFactory.createContainer(topicName);
repliesContainer.getContainerProperties().setGroupId(groupId);
repliesContainer.getContainerProperties().setKafkaConsumerProperties(repliesContainerConfigProps);
return repliesContainer;
}
KafkaListener
@Transactional(value = "kafkaTransactionManager")
@KafkaListener(topics = "${application.kafka.synchronousTest.topicName}",
groupId = "${application.kafka.group-id}",
concurrency = "${application.kafka.synchronousTest.concurrency}")
@SendTo
public SynchronousTestEvent receiveAndForwardMessageFromTopic(SynchronousTestEvent incomingSynchronousTestEvent) {
log.debug("Received message: {}", incomingSynchronousTestEvent);
SynchronousTestEvent outgoingSynchronousTestEvent = synchronousTestEventHandler.handleRequestReplyEvent(incomingSynchronousTestEvent);
return outgoingSynchronousTestEvent;
}
SynchronousTestEventService processing event from KafkaListener
@Transactional(value = "kafkaTransactionManager")
@Override
public SynchronousTestEvent handleRequestReplyEvent(SynchronousTestEvent synchronousTestEvent) {
try {
log.debug("Received message: {}", synchronousTestEvent);
SynchronousTestEvent outgoingSynchronousTestEvent = synchronousTestEventFactory
.createEvent(synchronousTestEvent.getIncomingFileDto(), true, "Success");
return outgoingSynchronousTestEvent;
} catch (Exception ex) {
log.error("Error", ex);
synchronousTestEvent.setProcessed(false);
synchronousTestEvent.setMessage(ex.getMessage());
return synchronousTestEvent;
}
}
Method generating an event to be sent using ReplyingKafkaTemplate
@Transactional("kafkaTransactionManager")
public void sendSynchronousTestEvent(SubmissionDto submissionDto) throws ExecutionException, InterruptedException, TimeoutException {
SynchronousTestEvent outgoingSynchronousTestEvent = synchronousTestEventFactory
.createEvent(submissionDto.getIncomingFileDto(), false, null);
synchronousTestRequestReplyService.sendRequestReplyEvent(outgoingSynchronousTestEvent);
}
Method sending event to using ReplyingKafkaTemplate
private final ReplyingKafkaTemplate<String, Event, Event> replyingKafkaTemplate;
@Transactional(value = "kafkaTransactionManager")
@Override
public Event sendRequestReplyEvent(Event event) throws ExecutionException, InterruptedException, TimeoutException {
ProducerRecord<String, Event> record = new ProducerRecord<>(synchronousTestΤopicName, event.key(), event);
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, repliesΤopicName.getBytes()));
RequestReplyFuture<String, Event, Event> replyFuture = replyingKafkaTemplate.sendAndReceive(record);
SendResult<String, Event> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Event> consumerRecord = replyFuture.get(20, TimeUnit.SECONDS);
return consumerRecord.value();
}
Logs
2024-10-16T11:00:58.402+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2024-10-16T11:00:58.430+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Received: 1 records
2024-10-16T11:00:58.431+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=SynchronousTestEvent(incomingFileDto=IncomingFileDto(id=null, uuid=181fe61d-ad5f-464b-a048-8919ce0eb116, externalEntityId=45, externalEntityInternalCode=011, subsystemCode=6, irisUserId=null, fileExchangeNumber=0, fileName=MFMC_T0_custom.xlsx, filePath=C:\Temp\elfund\incoming\, fileType=XLSX, messageId=null, messageTimeStamp=0, workbookDataAsCompressedData=null, jsonAsCompressedData=null, createdDate=2024-10-16T08:00:58.284047800Z, modifiedDate=2024-10-16T08:00:58.284047800Z), processed=false, message=null), headers={kafka_offset=4, kafka_consumer=org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer@50dbcb28, kafka_correlationId=[B@7f70fe7d, kafka_timestampType=CREATE_TIME, kafka_replyTopic=[B@4c1f883c, kafka_receivedPartitionId=0, kafka_receivedMessageKey=181fe61d-ad5f-464b-a048-8919ce0eb116, kafka_receivedTopic=elfund-synchronous-test, kafka_receivedTimestamp=1729065658399, kafka_groupId=elfund-group}]]
2024-10-16T11:00:58.431+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] o.s.k.t.KafkaTransactionManager : Creating new transaction with name [gr.test.integration.kafka.events.synchronous_test.SynchronousTestEventListener.receiveAndForwardMessageFromTopic]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; 'kafkaTransactionManager'
2024-10-16T11:00:58.431+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@635db03a] beginTransaction()
2024-10-16T11:00:58.431+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] o.s.k.t.KafkaTransactionManager : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@635db03a]]
2024-10-16T11:00:58.431+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] s.e.i.k.e.s.SynchronousTestEventListener : Received message: SynchronousTestEvent(incomingFileDto=IncomingFileDto(id=null, uuid=181fe61d-ad5f-464b-a048-8919ce0eb116, externalEntityId=45, externalEntityInternalCode=011, subsystemCode=6, irisUserId=null, fileExchangeNumber=0, fileName=MFMC_T0_custom.xlsx, filePath=C:\Temp\elfund\incoming\, fileType=XLSX, messageId=null, messageTimeStamp=0, workbookDataAsCompressedData=null, jsonAsCompressedData=null, createdDate=2024-10-16T08:00:58.284047800Z, modifiedDate=2024-10-16T08:00:58.284047800Z), processed=false, message=null)
2024-10-16T11:00:58.431+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] o.s.k.t.KafkaTransactionManager : Participating in existing transaction
2024-10-16T11:00:58.431+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] .b.i.s.e.p.r.SynchronousTestEventService : Received message: SynchronousTestEvent(incomingFileDto=IncomingFileDto(id=null, uuid=181fe61d-ad5f-464b-a048-8919ce0eb116, externalEntityId=45, externalEntityInternalCode=011, subsystemCode=6, irisUserId=null, fileExchangeNumber=0, fileName=MFMC_T0_custom.xlsx, filePath=C:\Temp\elfund\incoming\, fileType=XLSX, messageId=null, messageTimeStamp=0, workbookDataAsCompressedData=null, jsonAsCompressedData=null, createdDate=2024-10-16T08:00:58.284047800Z, modifiedDate=2024-10-16T08:00:58.284047800Z), processed=false, message=null)
2024-10-16T11:01:04.686+03:00 DEBUG 22368 --- [elfund-processing] [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2024-10-16T11:01:04.687+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2024-10-16T11:01:04.687+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2024-10-16T11:01:04.687+03:00 DEBUG 22368 --- [elfund-processing] [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2024-10-16T11:01:04.689+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] o.s.k.t.KafkaTransactionManager : Initiating transaction commit
2024-10-16T11:01:04.689+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@635db03a] commitTransaction()
2024-10-16T11:01:04.689+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] .a.RecordMessagingMessageListenerAdapter : Listener method returned result [SynchronousTestEvent(incomingFileDto=IncomingFileDto(id=null, uuid=181fe61d-ad5f-464b-a048-8919ce0eb116, externalEntityId=45, externalEntityInternalCode=011, subsystemCode=6, irisUserId=null, fileExchangeNumber=0, fileName=MFMC_T0_custom.xlsx, filePath=C:\Temp\elfund\incoming\, fileType=XLSX, messageId=null, messageTimeStamp=0, workbookDataAsCompressedData=null, jsonAsCompressedData=null, createdDate=2024-10-16T08:00:58.284047800Z, modifiedDate=2024-10-16T08:00:58.284047800Z), processed=true, message=Success)] - generating response message for it
2024-10-16T11:01:04.689+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] erMapper$SimplePatternBasedHeaderMatcher : headerName=[kafka_correlationId] WILL be mapped, matched pattern=*
2024-10-16T11:01:04.690+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] erMapper$SimplePatternBasedHeaderMatcher : headerName=[id] WILL NOT be mapped, matched pattern=!id
2024-10-16T11:01:04.690+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] erMapper$SimplePatternBasedHeaderMatcher : headerName=[timestamp] WILL NOT be mapped, matched pattern=!timestamp
2024-10-16T11:01:04.690+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2024-10-16T11:01:04.707+03:00 ERROR 22368 --- [elfund-processing] [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:227) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:168) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2836) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2713) ~[spring-kafka-3.2.4.jar:3.2.4]
at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.13.4.jar:1.13.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296) ~[spring-kafka-3.2.4.jar:3.2.4]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1804) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:842) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2873) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2814) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2778) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2701) ~[spring-kafka-3.2.4.jar:3.2.4]
... 11 common frames omitted
Caused by: java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-6.1.13.jar:6.1.13]
at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:938) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:816) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:793) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:609) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.sendReplyForMessageSource(MessagingMessageListenerAdapter.java:641) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.sendSingleResult(MessagingMessageListenerAdapter.java:608) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.sendResponse(MessagingMessageListenerAdapter.java:572) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.handleResult(MessagingMessageListenerAdapter.java:503) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:386) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.4.jar:3.2.4]
... 13 common frames omitted