spring-cloud-stream icon indicating copy to clipboard operation
spring-cloud-stream copied to clipboard

No thread-bound request found when trying to save message using CrudRepository

Open vishalmamidi opened this issue 2 years ago • 0 comments

Describe the issue I have a cloud stream consumer which will consume message from topic and save that topic information to mysql db. but when there is repository.save operation is performed facing below issue

"No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.; nested exception is java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request."

To Reproduce my consumer

@Configuration
@Slf4j
public class KafkaConsumerConfig {

    @Autowired
    TestRepository testRepo;

    @Bean
    public Consumer<Message<TestModel>> testConsumer(TestService walletService) {
        return message -> {
            MessageHeaders messageHeader = message.getHeaders();
            log.info("Received Message from topic: {}",messageHeader.get("kafka_receivedTopic"));

            testRepo.save(populateTestEntity(message.getPayload(),false));
        };
    }

    private TestEntity populateTestEntity(
            TestModel reqModel, boolean isDelete){
        TestEntity entity = new TestEntity();
        entity.setActiveStatus(!isDelete);
        return entity;
    }
}

repository class

@Repository
public interface TestRepository extends CrudRepository<TestEntity, Long> {

}

applicaiton properties

spring.cloud.stream.kafka.binder.brokers=amazonaws.com:9092
spring.cloud.stream.kafka.binder.autoCreateTopics= false
#Kafka consumers
spring.cloud.function.definition=testConsumer
#testConsumer
spring.cloud.stream.bindings.testConsumer-in-0.destination=test
spring.cloud.stream.bindings.testConsumer-in-0.group=amazon.msk.canary.group.broker-1
spring.cloud.stream.bindings.testConsumer-in-0.consumer.maxAttempts= 3
spring.cloud.stream.kafka.bindings.testConsumer-in-0.consumer.enableDlq= true
spring.cloud.stream.kafka.bindings.testConsumer-in-0.consumer.dlqName= test-dlq
#supressLogs
logging.level.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator=warn
logging.level.org.apache.kafka.clients.consumer.internals.SubscriptionState=warn
logging.level.org.springframework.cloud.stream.binder.kafka=warn
logging.level.org.apache.kafka.clients.consumer.internals.AbstractCoordinator=warn

Version of the framework jdk 16.0.2 spring boot 2.5.5 spring cloud stream 3.1.6

Expected behavior should save record as expected without error

**Stack Trace

{
    "@timestamp": "2022-06-13T15:30:38.615+0530",
    "message": "org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@2006b973]; nested exception is org.springframework.dao.InvalidDataAccessApiUsageException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.; nested exception is java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request., failedMessage=GenericMessage [payload=byte[128], headers={skip-input-type-conversion=false, kafka_offset=4, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@19a0f1f3, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=5, kafka_receivedTopic=test, kafka_receivedTimestamp=1655114429822, contentType=application/json, kafka_groupId=amazon.msk.canary.group.broker-1}]\r\n\tat org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)\r\n\tat org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)\r\n\tat org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)\r\n\tat org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)\r\n\tat org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)\r\n\tat org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)\r\n\tat org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)\r\n\tat org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)\r\n\tat org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)\r\n\tat org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)\r\n\tat org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)\r\n\tat org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)\r\n\tat org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)\r\n\tat org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:398)\r\n\tat org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:79)\r\n\tat org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:457)\r\n\tat org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431)\r\n\tat org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:123)\r\n\tat org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)\r\n\tat org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)\r\n\tat org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:117)\r\n\tat org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:41)\r\n\tat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2334)\r\n\tat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2315)\r\n\tat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2229)\r\n\tat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2154)\r\n\tat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2036)\r\n\tat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1709)\r\n\tat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1276)\r\n\tat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1268)\r\n\tat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1163)\r\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\r\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\r\n\tat java.base/java.lang.Thread.run(Thread.java:831)\r\nCaused by: org.springframework.dao.InvalidDataAccessApiUsageException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.; nested exception is java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.\r\n\tat org.springframework.orm.jpa.EntityManagerFactoryUtils.convertJpaAccessExceptionIfPossible(EntityManagerFactoryUtils.java:371)\r\n\tat org.springframework.orm.jpa.vendor.HibernateJpaDialect.translateExceptionIfPossible(HibernateJpaDialect.java:235)\r\n\tat org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:566)\r\n\tat org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:743)\r\n\tat org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)\r\n\tat org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:654)\r\n\tat org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:407)\r\n\tat org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)\r\n\tat org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)\r\n\tat org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:137)\r\n\tat org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)\r\n\tat org.springframework.data.jpa.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:174)\r\n\tat org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)\r\n\tat org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:97)\r\n\tat org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)\r\n\tat org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)\r\n\tat jdk.proxy3/jdk.proxy3.$Proxy156.save(Unknown Source)\r\n\tat com.jb.deposits.config.KafkaConsumerConfig.lambda$offBoardWalletConsumer$0(KafkaConsumerConfig.java:45)\r\n\tat org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:975)\r\n\tat org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:704)\r\n\tat org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:550)\r\n\tat org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:84)\r\n\tat org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:749)\r\n\tat org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:581)\r\n\tat org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)\r\n\t... 32 more\r\nCaused by: java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.\r\n\tat org.springframework.web.context.request.RequestContextHolder.currentRequestAttributes(RequestContextHolder.java:131)\r\n\tat org.springframework.web.context.support.WebApplicationContextUtils.currentRequestAttributes(WebApplicationContextUtils.java:313)\r\n\tat org.springframework.web.context.support.WebApplicationContextUtils.access$400(WebApplicationContextUtils.java:66)\r\n\tat org.springframework.web.context.support.WebApplicationContextUtils$RequestObjectFactory.getObject(WebApplicationContextUtils.java:329)\r\n\tat org.springframework.web.context.support.WebApplicationContextUtils$RequestObjectFactory.getObject(WebApplicationContextUtils.java:324)\r\n\tat org.springframework.beans.factory.support.AutowireUtils$ObjectFactoryDelegatingInvocationHandler.invoke(AutowireUtils.java:292)\r\n\tat jdk.proxy2/jdk.proxy2.$Proxy147.getHeader(Unknown Source)\r\n\tat com.jb.deposits.serviceimpl.AuditServiceImpl.streamMessage(AuditServiceImpl.java:44)\r\n\tat com.jb.deposits.loggers.listeners.EntityListener.perform(EntityListener.java:41)\r\n\tat com.jb.deposits.loggers.listeners.EntityListener.postPersist(EntityListener.java:27)\r\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\r\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)\r\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\r\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:567)\r\n\tat org.hibernate.jpa.event.internal.ListenerCallback.performCallback(ListenerCallback.java:35)\r\n\tat org.hibernate.jpa.event.internal.CallbackRegistryImpl.callback(CallbackRegistryImpl.java:95)\r\n\tat org.hibernate.jpa.event.internal.CallbackRegistryImpl.postCreate(CallbackRegistryImpl.java:64)\r\n\tat org.hibernate.event.internal.PostInsertEventListenerStandardImpl.onPostInsert(PostInsertEventListenerStandardImpl.java:31)\r\n\tat org.hibernate.event.service.internal.EventListenerGroupImpl.fireLazyEventOnEachListener(EventListenerGroupImpl.java:88)\r\n\tat org.hibernate.action.internal.EntityInsertAction.postInsert(EntityInsertAction.java:176)\r\n\tat org.hibernate.action.internal.EntityInsertAction.execute(EntityInsertAction.java:153)\r\n\tat org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:604)\r\n\tat org.hibernate.engine.spi.ActionQueue.lambda$executeActions$1(ActionQueue.java:478)\r\n\tat java.base/java.util.LinkedHashMap.forEach(LinkedHashMap.java:723)\r\n\tat org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:475)\r\n\tat org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:344)\r\n\tat org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:40)\r\n\tat org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener(EventListenerGroupImpl.java:99)\r\n\tat org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1362)\r\n\tat org.hibernate.internal.SessionImpl.managedFlush(SessionImpl.java:453)\r\n\tat org.hibernate.internal.SessionImpl.flushBeforeTransactionCompletion(SessionImpl.java:3212)\r\n\tat org.hibernate.internal.SessionImpl.beforeTransactionCompletion(SessionImpl.java:2380)\r\n\tat org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.beforeTransactionCompletion(JdbcCoordinatorImpl.java:448)\r\n\tat org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.beforeCompletionCallback(JdbcResourceLocalTransactionCoordinatorImpl.java:183)\r\n\tat org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.access$300(JdbcResourceLocalTransactionCoordinatorImpl.java:40)\r\n\tat org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.commit(JdbcResourceLocalTransactionCoordinatorImpl.java:281)\r\n\tat org.hibernate.engine.transaction.internal.TransactionImpl.commit(TransactionImpl.java:101)\r\n\tat org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:562)\r\n\t... 54 more\r\n",
    "source_host": "mach-60RI959",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='test', partitions=0, dlqName='null'}.container-0-C-1",
    "level": "ERROR",
    "logger_name": "org.springframework.integration.handler.LoggingHandler"
}
{
    "@timestamp": "2022-06-13T15:30:38.674+0530",
    "message": "ProducerConfig values: \r\n\tacks = 1\r\n\tbatch.size = 16384\r\n\tbootstrap.servers = [amazonaws.com:9092, b-1.-east-1.amazonaws.com:9092]\r\n\tbuffer.memory = 33554432\r\n\tclient.dns.lookup = use_all_dns_ips\r\n\tclient.id = producer-1\r\n\tcompression.type = none\r\n\tconnections.max.idle.ms = 540000\r\n\tdelivery.timeout.ms = 120000\r\n\tenable.idempotence = false\r\n\tinterceptor.classes = []\r\n\tinternal.auto.downgrade.txn.commit = true\r\n\tkey.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer\r\n\tlinger.ms = 0\r\n\tmax.block.ms = 60000\r\n\tmax.in.flight.requests.per.connection = 5\r\n\tmax.request.size = 1048576\r\n\tmetadata.max.age.ms = 300000\r\n\tmetadata.max.idle.ms = 300000\r\n\tmetric.reporters = []\r\n\tmetrics.num.samples = 2\r\n\tmetrics.recording.level = INFO\r\n\tmetrics.sample.window.ms = 30000\r\n\tpartitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner\r\n\treceive.buffer.bytes = 32768\r\n\treconnect.backoff.max.ms = 1000\r\n\treconnect.backoff.ms = 50\r\n\trequest.timeout.ms = 30000\r\n\tretries = 2147483647\r\n\tretry.backoff.ms = 100\r\n\tsasl.client.callback.handler.class = null\r\n\tsasl.jaas.config = null\r\n\tsasl.kerberos.kinit.cmd = /usr/bin/kinit\r\n\tsasl.kerberos.min.time.before.relogin = 60000\r\n\tsasl.kerberos.service.name = null\r\n\tsasl.kerberos.ticket.renew.jitter = 0.05\r\n\tsasl.kerberos.ticket.renew.window.factor = 0.8\r\n\tsasl.login.callback.handler.class = null\r\n\tsasl.login.class = null\r\n\tsasl.login.refresh.buffer.seconds = 300\r\n\tsasl.login.refresh.min.period.seconds = 60\r\n\tsasl.login.refresh.window.factor = 0.8\r\n\tsasl.login.refresh.window.jitter = 0.05\r\n\tsasl.mechanism = GSSAPI\r\n\tsecurity.protocol = PLAINTEXT\r\n\tsecurity.providers = null\r\n\tsend.buffer.bytes = 131072\r\n\tsocket.connection.setup.timeout.max.ms = 127000\r\n\tsocket.connection.setup.timeout.ms = 10000\r\n\tssl.cipher.suites = null\r\n\tssl.enabled.protocols = [TLSv1.2, TLSv1.3]\r\n\tssl.endpoint.identification.algorithm = https\r\n\tssl.engine.factory.class = null\r\n\tssl.key.password = null\r\n\tssl.keymanager.algorithm = SunX509\r\n\tssl.keystore.certificate.chain = null\r\n\tssl.keystore.key = null\r\n\tssl.keystore.location = null\r\n\tssl.keystore.password = null\r\n\tssl.keystore.type = JKS\r\n\tssl.protocol = TLSv1.3\r\n\tssl.provider = null\r\n\tssl.secure.random.implementation = null\r\n\tssl.trustmanager.algorithm = PKIX\r\n\tssl.truststore.certificates = null\r\n\tssl.truststore.location = null\r\n\tssl.truststore.password = null\r\n\tssl.truststore.type = JKS\r\n\ttransaction.timeout.ms = 60000\r\n\ttransactional.id = null\r\n\tvalue.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer\r\n",
    "source_host": "mach-60RI959",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='test', partitions=0, dlqName='null'}.container-0-C-1",
    "level": "INFO",
    "logger_name": "org.apache.kafka.clients.producer.ProducerConfig"
}

vishalmamidi avatar Jun 13 '22 10:06 vishalmamidi