data-prepper icon indicating copy to clipboard operation
data-prepper copied to clipboard

[BUG] ChangeVisibilityTimeout call failure during pipeline shutdown.

Open chenqi0805 opened this issue 1 year ago • 1 comments

Describe the bug SqsClient get closed early when it is still making changeVisibilityTimeout call within acknowledgment callback thread:

2024-05-31T20:29:16.968 [acknowledgement-callback-10] ERROR org.opensearch.dataprepper.plugins.source.s3.SqsWorker - Failed to set visibility timeout for message xxxxxxxxxx to 60
java.lang.IllegalStateException: Connection pool shut down
	at org.apache.http.util.Asserts.check([Asserts.java:34](http://asserts.java:34/)) ~[Apache-HttpComponents-HttpCore-4.4.x.jar:?]
	at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection([PoolingHttpClientConnectionManager.java:269](http://poolinghttpclientconnectionmanager.java:269/)) ~[Apache-HttpComponents-HttpClient-4.5.x.jar:?]
	at software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection([ClientConnectionManagerFactory.java:75](http://clientconnectionmanagerfactory.java:75/)) ~[apache-client-2.23.3.jar:?]
	at software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection([ClientConnectionManagerFactory.java:57](http://clientconnectionmanagerfactory.java:57/)) ~[apache-client-2.23.3.jar:?]
	at org.apache.http.impl.execchain.MainClientExec.execute([MainClientExec.java:176](http://mainclientexec.java:176/)) ~[Apache-HttpComponents-HttpClient-4.5.x.jar:?]
	at org.apache.http.impl.execchain.ProtocolExec.execute([ProtocolExec.java:186](http://protocolexec.java:186/)) ~[Apache-HttpComponents-HttpClient-4.5.x.jar:?]
	at org.apache.http.impl.client.InternalHttpClient.doExecute([InternalHttpClient.java:185](http://internalhttpclient.java:185/)) ~[Apache-HttpComponents-HttpClient-4.5.x.jar:?]
	at org.apache.http.impl.client.CloseableHttpClient.execute([CloseableHttpClient.java:83](http://closeablehttpclient.java:83/)) ~[Apache-HttpComponents-HttpClient-4.5.x.jar:?]
	at org.apache.http.impl.client.CloseableHttpClient.execute([CloseableHttpClient.java:56](http://closeablehttpclient.java:56/)) ~[Apache-HttpComponents-HttpClient-4.5.x.jar:?]
	at software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute([ApacheSdkHttpClient.java:72](http://apachesdkhttpclient.java:72/)) ~[apache-client-2.23.3.jar:?]
	at software.amazon.awssdk.http.apache.ApacheHttpClient.execute([ApacheHttpClient.java:254](http://apachehttpclient.java:254/)) ~[apache-client-2.23.3.jar:?]
	at software.amazon.awssdk.http.apache.ApacheHttpClient.access$500([ApacheHttpClient.java:104](http://apachehttpclient.java:104/)) ~[apache-client-2.23.3.jar:?]
	at [software.amazon.awssdk.http.apache.ApacheHttpClient$1.call](http://software.amazon.awssdk.http.apache.apachehttpclient$1.call/)([ApacheHttpClient.java:231](http://apachehttpclient.java:231/)) ~[apache-client-2.23.3.jar:?]
	at [software.amazon.awssdk.http.apache.ApacheHttpClient$1.call](http://software.amazon.awssdk.http.apache.apachehttpclient$1.call/)([ApacheHttpClient.java:228](http://apachehttpclient.java:228/)) ~[apache-client-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe([MetricUtils.java:99](http://metricutils.java:99/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest([MakeHttpRequestStage.java:79](http://makehttprequeststage.java:79/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute([MakeHttpRequestStage.java:57](http://makehttprequeststage.java:57/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute([MakeHttpRequestStage.java:40](http://makehttprequeststage.java:40/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute([ApiCallAttemptTimeoutTrackingStage.java:72](http://apicallattempttimeouttrackingstage.java:72/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute([ApiCallAttemptTimeoutTrackingStage.java:42](http://apicallattempttimeouttrackingstage.java:42/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute([TimeoutExceptionHandlingStage.java:78](http://timeoutexceptionhandlingstage.java:78/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute([TimeoutExceptionHandlingStage.java:40](http://timeoutexceptionhandlingstage.java:40/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute([ApiCallAttemptMetricCollectionStage.java:55](http://apicallattemptmetriccollectionstage.java:55/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute([ApiCallAttemptMetricCollectionStage.java:39](http://apicallattemptmetriccollectionstage.java:39/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute([RetryableStage.java:81](http://retryablestage.java:81/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute([RetryableStage.java:36](http://retryablestage.java:36/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute([StreamManagingStage.java:56](http://streammanagingstage.java:56/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute([StreamManagingStage.java:36](http://streammanagingstage.java:36/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer([ApiCallTimeoutTrackingStage.java:80](http://apicalltimeouttrackingstage.java/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute([ApiCallTimeoutTrackingStage.java:60](http://apicalltimeouttrackingstage.java:60/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute([ApiCallTimeoutTrackingStage.java:42](http://apicalltimeouttrackingstage.java:42/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute([ApiCallMetricCollectionStage.java:50](http://apicallmetriccollectionstage.java:50/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute([ApiCallMetricCollectionStage.java:32](http://apicallmetriccollectionstage.java:32/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute([ExecutionFailureExceptionReportingStage.java:37](http://executionfailureexceptionreportingstage.java:37/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute([ExecutionFailureExceptionReportingStage.java:26](http://executionfailureexceptionreportingstage.java:26/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute([AmazonSyncHttpClient.java:224](http://amazonsynchttpclient.java:224/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke([BaseSyncClientHandler.java:103](http://basesyncclienthandler.java:103/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute([BaseSyncClientHandler.java:173](http://basesyncclienthandler.java:173/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1([BaseSyncClientHandler.java:80](http://basesyncclienthandler.java/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess([BaseSyncClientHandler.java:182](http://basesyncclienthandler.java:182/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute([BaseSyncClientHandler.java:74](http://basesyncclienthandler.java:74/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute([SdkSyncClientHandler.java:45](http://sdksyncclienthandler.java:45/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute([AwsSyncClientHandler.java:53](http://awssyncclienthandler.java:53/)) ~[aws-core-2.23.3.jar:?]
	at software.amazon.awssdk.services.sqs.DefaultSqsClient.changeMessageVisibility([DefaultSqsClient.java:544](http://defaultsqsclient.java:544/)) ~[sqs-2.23.3.jar:?]
	at org.opensearch.dataprepper.plugins.source.s3.SqsWorker.lambda$processS3EventNotificationRecords$1([SqsWorker.java:286](http://sqsworker.java:286/)) ~[s3-source-2.8.0-SNAPSHOT.jar:?]
	at org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSet.checkProgress([DefaultAcknowledgementSet.java:73](http://defaultacknowledgementset.java:73/)) ~[data-prepper-core-2.8.0-SNAPSHOT.jar:?]
	at java.base/java.util.concurrent.Executors$[RunnableAdapter.call](http://runnableadapter.call/)([Executors.java:515](http://executors.java:515/)) ~[?:?]
	at java.base/java.util.concurrent.FutureTask.runAndReset([FutureTask.java:305](http://futuretask.java:305/)) ~[?:?]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$[ScheduledFutureTask.run](http://scheduledfuturetask.run/)([ScheduledThreadPoolExecutor.java:305](http://scheduledthreadpoolexecutor.java:305/)) ~[?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker([ThreadPoolExecutor.java:1128](http://threadpoolexecutor.java:1128/)) ~[?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$[Worker.run](http://worker.run/)([ThreadPoolExecutor.java:628](http://threadpoolexecutor.java:628/)) ~[?:?]
	at java.base/java.lang.Thread.run([Thread.java:829](http://thread.java:829/)) [?:?]

To Reproduce Steps to reproduce the behavior:

  1. setup an s3 pipeline with acknowledgment and visibility timeout
  2. shutdown the pipeline

Expected behavior This error root cause (connection pool shutdown) should not appear

Additional context Approaches:

  1. check shutdown flag when calling changing visibility timeout in acknowledgment thread. This approach will potentially increase duplicate messages during shutdown.
  2. shutdown sqs threads first; wait longer for all ack to be finished before shutting down the sqsClient and pipeline. This approach can mitigate duplication of messages during shutdown but extend the shutdown latency

chenqi0805 avatar May 31 '24 22:05 chenqi0805

I am having the same problem.

wanpdsantos avatar Jun 30 '24 19:06 wanpdsantos

I've been investigating this some. I created #4740 to provide a long-term solution which uses Data Prepper core to hold on acknowledgements. In the meantime, I'm going to make some more minor changes which avoid the error messages. But, I won't hold the source open to allow the acknowledgements to flush because that will require more core changes.

dlvenable avatar Jul 16 '24 15:07 dlvenable