ml-commons
ml-commons copied to clipboard
[BUG] Conversational search random timeout exception
What is the bug? Test on a multi-node cluster and see timeout exception randomly frequently
{
"error": {
"root_cause": [
{
"type": "timeout_exception",
"reason": "java.util.concurrent.TimeoutException: Timeout waiting for task."
}
],
"type": "timeout_exception",
"reason": "java.util.concurrent.TimeoutException: Timeout waiting for task.",
"caused_by": {
"type": "timeout_exception",
"reason": "Timeout waiting for task."
}
},
"status": 500
}
How can one reproduce the bug? Steps to reproduce the behavior:
- Follow this doc https://opensearch.org/docs/latest/search-plugins/conversational-search/
What is the expected behavior? From error log, it's caused by https://github.com/opensearch-project/ml-commons/blob/2.11/search-processors/src/main/java/org/opensearch/searchpipelines/questionanswering/generative/client/ConversationalMemoryClient.java#L98
GetInteractionsResponse response = client
.execute(GetInteractionsAction.INSTANCE, new GetInteractionsRequest(conversationId, maxResults, from))
.actionGet(DEFAULT_TIMEOUT_IN_MILLIS);
Should not use actionGet which will block thread.
Suggest change to use action listener
What is your host/environment?
- OS: linux
- Version: 2.11
- Plugins
Do you have any screenshots? No
Do you have any additional context? Error log
[2024-04-17T22:08:54,044][WARN ][r.suppressed ] [1a96c8c1276b3e88ebb2d33448e12e1e] path: /test_population_data/_search, params: {pretty=true, search_pipeline=demo_rag_pipeline, index=test_population_data}
SearchPipelineProcessingException[OpenSearchTimeoutException[java.util.concurrent.TimeoutException: Timeout waiting for task.]; nested: TimeoutException[Timeout waiting for task.];]; nested: OpenSearchTimeoutException[java.util.concurrent.TimeoutException: Timeout waiting for task.]; nested: TimeoutException[Timeout waiting for task.];
at org.opensearch.search.pipeline.Pipeline.transformResponse(Pipeline.java:199)
at org.opensearch.search.pipeline.PipelinedRequest.transformResponse(PipelinedRequest.java:31)
at org.opensearch.action.search.TransportSearchAction.lambda$executeRequest$0(TransportSearchAction.java:453)
at org.opensearch.core.action.ActionListener$1.onResponse(ActionListener.java:82)
at org.opensearch.core.action.ActionListener$5.onResponse(ActionListener.java:268)
at org.opensearch.action.search.AbstractSearchAsyncAction.sendSearchResponse(AbstractSearchAsyncAction.java:707)
at org.opensearch.action.search.ExpandSearchPhase.run(ExpandSearchPhase.java:132)
at org.opensearch.action.search.SearchPhase.recordAndRun(SearchPhase.java:59)
at org.opensearch.action.search.AbstractSearchAsyncAction.executePhase(AbstractSearchAsyncAction.java:456)
at org.opensearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:440)
at org.opensearch.action.search.FetchSearchPhase.moveToNextPhase(FetchSearchPhase.java:298)
at org.opensearch.action.search.FetchSearchPhase.lambda$innerRun$1(FetchSearchPhase.java:138)
at org.opensearch.action.search.CountedCollector.countDown(CountedCollector.java:66)
at org.opensearch.action.search.ArraySearchPhaseResults.consumeResult(ArraySearchPhaseResults.java:61)
at org.opensearch.action.search.CountedCollector.onResult(CountedCollector.java:74)
at org.opensearch.action.search.FetchSearchPhase$2.innerOnResponse(FetchSearchPhase.java:243)
at org.opensearch.action.search.FetchSearchPhase$2.innerOnResponse(FetchSearchPhase.java:238)
at org.opensearch.action.search.SearchActionListener.onResponse(SearchActionListener.java:59)
at org.opensearch.action.search.SearchActionListener.onResponse(SearchActionListener.java:44)
at org.opensearch.action.ActionListenerResponseHandler.handleResponse(ActionListenerResponseHandler.java:70)
at org.opensearch.action.search.SearchTransportService$ConnectionCountingHandler.handleResponse(SearchTransportService.java:744)
at org.opensearch.transport.TransportService$6.handleResponse(TransportService.java:897)
at org.opensearch.security.transport.SecurityInterceptor$RestoringTransportResponseHandler.handleResponse(SecurityInterceptor.java:419)
at org.opensearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1516)
at org.opensearch.transport.InboundHandler.doHandleResponse(InboundHandler.java:411)
at org.opensearch.transport.InboundHandler.handleResponse(InboundHandler.java:403)
at org.opensearch.transport.InboundHandler.messageReceived(InboundHandler.java:168)
at org.opensearch.transport.InboundHandler.inboundMessage(InboundHandler.java:123)
at org.opensearch.transport.TcpTransport.inboundMessage(TcpTransport.java:770)
at org.opensearch.transport.InboundPipeline.forwardFragments(InboundPipeline.java:175)
at org.opensearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:150)
at org.opensearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:115)
at org.opensearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:95)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1471)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1334)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1383)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: OpenSearchTimeoutException[java.util.concurrent.TimeoutException: Timeout waiting for task.]; nested: TimeoutException[Timeout waiting for task.];
at org.opensearch.common.util.concurrent.FutureUtils.get(FutureUtils.java:96)
at org.opensearch.action.support.AdapterActionFuture.actionGet(AdapterActionFuture.java:79)
at org.opensearch.action.support.AdapterActionFuture.actionGet(AdapterActionFuture.java:68)
at org.opensearch.searchpipelines.questionanswering.generative.client.ConversationalMemoryClient.getInteractions(ConversationalMemoryClient.java:98)
at org.opensearch.searchpipelines.questionanswering.generative.GenerativeQAResponseProcessor.processResponse(GenerativeQAResponseProcessor.java:138)
at org.opensearch.search.pipeline.Pipeline.transformResponse(Pipeline.java:177)
... 60 more
Caused by: java.util.concurrent.TimeoutException: Timeout waiting for task.
at org.opensearch.common.util.concurrent.BaseFuture$Sync.get(BaseFuture.java:257)
at org.opensearch.common.util.concurrent.BaseFuture.get(BaseFuture.java:82)
at org.opensearch.common.util.concurrent.FutureUtils.get(FutureUtils.java:94)
... 65 more
Change to implement SearchResponseProcessor.processResponseAsync method should be good,
@austintlee , we need to prioritize this issue. Do you have enough bandwidth ?
cc @mashah @jonfritz
@austintlee Find some other places
https://github.com/opensearch-project/ml-commons/blob/main/search-processors/src/main/java/org/opensearch/searchpipelines/questionanswering/generative/client/ConversationalMemoryClient.java#L61
CreateConversationResponse response = client
.execute(CreateConversationAction.INSTANCE, new CreateConversationRequest(name))
.actionGet(DEFAULT_TIMEOUT_IN_MILLIS);
https://github.com/opensearch-project/ml-commons/blob/main/search-processors/src/main/java/org/opensearch/searchpipelines/questionanswering/generative/client/ConversationalMemoryClient.java#L82C13-L82C23
CreateInteractionResponse res = client
.execute(
CreateInteractionAction.INSTANCE,
new CreateInteractionRequest(conversationId, input, promptTemplate, response, origin, additionalInfo)
)
.actionGet(DEFAULT_TIMEOUT_IN_MILLIS);
https://github.com/opensearch-project/ml-commons/blob/main/search-processors/src/main/java/org/opensearch/searchpipelines/questionanswering/generative/client/ConversationalMemoryClient.java#L127
GetInteractionsResponse response = client
.execute(GetInteractionsAction.INSTANCE, new GetInteractionsRequest(conversationId, maxResults, from))
.actionGet(DEFAULT_TIMEOUT_IN_MILLIS);
I think we can close this?
I think we can close this?
Yes, close this issue. Thanks Austin.