[BUG] ChangeVisibilityTimeout call failure during pipeline shutdown.
Describe the bug SqsClient get closed early when it is still making changeVisibilityTimeout call within acknowledgment callback thread:
2024-05-31T20:29:16.968 [acknowledgement-callback-10] ERROR org.opensearch.dataprepper.plugins.source.s3.SqsWorker - Failed to set visibility timeout for message xxxxxxxxxx to 60
java.lang.IllegalStateException: Connection pool shut down
at org.apache.http.util.Asserts.check([Asserts.java:34](http://asserts.java:34/)) ~[Apache-HttpComponents-HttpCore-4.4.x.jar:?]
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection([PoolingHttpClientConnectionManager.java:269](http://poolinghttpclientconnectionmanager.java:269/)) ~[Apache-HttpComponents-HttpClient-4.5.x.jar:?]
at software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection([ClientConnectionManagerFactory.java:75](http://clientconnectionmanagerfactory.java:75/)) ~[apache-client-2.23.3.jar:?]
at software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection([ClientConnectionManagerFactory.java:57](http://clientconnectionmanagerfactory.java:57/)) ~[apache-client-2.23.3.jar:?]
at org.apache.http.impl.execchain.MainClientExec.execute([MainClientExec.java:176](http://mainclientexec.java:176/)) ~[Apache-HttpComponents-HttpClient-4.5.x.jar:?]
at org.apache.http.impl.execchain.ProtocolExec.execute([ProtocolExec.java:186](http://protocolexec.java:186/)) ~[Apache-HttpComponents-HttpClient-4.5.x.jar:?]
at org.apache.http.impl.client.InternalHttpClient.doExecute([InternalHttpClient.java:185](http://internalhttpclient.java:185/)) ~[Apache-HttpComponents-HttpClient-4.5.x.jar:?]
at org.apache.http.impl.client.CloseableHttpClient.execute([CloseableHttpClient.java:83](http://closeablehttpclient.java:83/)) ~[Apache-HttpComponents-HttpClient-4.5.x.jar:?]
at org.apache.http.impl.client.CloseableHttpClient.execute([CloseableHttpClient.java:56](http://closeablehttpclient.java:56/)) ~[Apache-HttpComponents-HttpClient-4.5.x.jar:?]
at software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute([ApacheSdkHttpClient.java:72](http://apachesdkhttpclient.java:72/)) ~[apache-client-2.23.3.jar:?]
at software.amazon.awssdk.http.apache.ApacheHttpClient.execute([ApacheHttpClient.java:254](http://apachehttpclient.java:254/)) ~[apache-client-2.23.3.jar:?]
at software.amazon.awssdk.http.apache.ApacheHttpClient.access$500([ApacheHttpClient.java:104](http://apachehttpclient.java:104/)) ~[apache-client-2.23.3.jar:?]
at [software.amazon.awssdk.http.apache.ApacheHttpClient$1.call](http://software.amazon.awssdk.http.apache.apachehttpclient$1.call/)([ApacheHttpClient.java:231](http://apachehttpclient.java:231/)) ~[apache-client-2.23.3.jar:?]
at [software.amazon.awssdk.http.apache.ApacheHttpClient$1.call](http://software.amazon.awssdk.http.apache.apachehttpclient$1.call/)([ApacheHttpClient.java:228](http://apachehttpclient.java:228/)) ~[apache-client-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe([MetricUtils.java:99](http://metricutils.java:99/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest([MakeHttpRequestStage.java:79](http://makehttprequeststage.java:79/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute([MakeHttpRequestStage.java:57](http://makehttprequeststage.java:57/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute([MakeHttpRequestStage.java:40](http://makehttprequeststage.java:40/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute([ApiCallAttemptTimeoutTrackingStage.java:72](http://apicallattempttimeouttrackingstage.java:72/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute([ApiCallAttemptTimeoutTrackingStage.java:42](http://apicallattempttimeouttrackingstage.java:42/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute([TimeoutExceptionHandlingStage.java:78](http://timeoutexceptionhandlingstage.java:78/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute([TimeoutExceptionHandlingStage.java:40](http://timeoutexceptionhandlingstage.java:40/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute([ApiCallAttemptMetricCollectionStage.java:55](http://apicallattemptmetriccollectionstage.java:55/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute([ApiCallAttemptMetricCollectionStage.java:39](http://apicallattemptmetriccollectionstage.java:39/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute([RetryableStage.java:81](http://retryablestage.java:81/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute([RetryableStage.java:36](http://retryablestage.java:36/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute([StreamManagingStage.java:56](http://streammanagingstage.java:56/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute([StreamManagingStage.java:36](http://streammanagingstage.java:36/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer([ApiCallTimeoutTrackingStage.java:80](http://apicalltimeouttrackingstage.java/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute([ApiCallTimeoutTrackingStage.java:60](http://apicalltimeouttrackingstage.java:60/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute([ApiCallTimeoutTrackingStage.java:42](http://apicalltimeouttrackingstage.java:42/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute([ApiCallMetricCollectionStage.java:50](http://apicallmetriccollectionstage.java:50/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute([ApiCallMetricCollectionStage.java:32](http://apicallmetriccollectionstage.java:32/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute([ExecutionFailureExceptionReportingStage.java:37](http://executionfailureexceptionreportingstage.java:37/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute([ExecutionFailureExceptionReportingStage.java:26](http://executionfailureexceptionreportingstage.java:26/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute([AmazonSyncHttpClient.java:224](http://amazonsynchttpclient.java:224/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke([BaseSyncClientHandler.java:103](http://basesyncclienthandler.java:103/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute([BaseSyncClientHandler.java:173](http://basesyncclienthandler.java:173/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1([BaseSyncClientHandler.java:80](http://basesyncclienthandler.java/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess([BaseSyncClientHandler.java:182](http://basesyncclienthandler.java:182/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute([BaseSyncClientHandler.java:74](http://basesyncclienthandler.java:74/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute([SdkSyncClientHandler.java:45](http://sdksyncclienthandler.java:45/)) ~[sdk-core-2.23.3.jar:?]
at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute([AwsSyncClientHandler.java:53](http://awssyncclienthandler.java:53/)) ~[aws-core-2.23.3.jar:?]
at software.amazon.awssdk.services.sqs.DefaultSqsClient.changeMessageVisibility([DefaultSqsClient.java:544](http://defaultsqsclient.java:544/)) ~[sqs-2.23.3.jar:?]
at org.opensearch.dataprepper.plugins.source.s3.SqsWorker.lambda$processS3EventNotificationRecords$1([SqsWorker.java:286](http://sqsworker.java:286/)) ~[s3-source-2.8.0-SNAPSHOT.jar:?]
at org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSet.checkProgress([DefaultAcknowledgementSet.java:73](http://defaultacknowledgementset.java:73/)) ~[data-prepper-core-2.8.0-SNAPSHOT.jar:?]
at java.base/java.util.concurrent.Executors$[RunnableAdapter.call](http://runnableadapter.call/)([Executors.java:515](http://executors.java:515/)) ~[?:?]
at java.base/java.util.concurrent.FutureTask.runAndReset([FutureTask.java:305](http://futuretask.java:305/)) ~[?:?]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$[ScheduledFutureTask.run](http://scheduledfuturetask.run/)([ScheduledThreadPoolExecutor.java:305](http://scheduledthreadpoolexecutor.java:305/)) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker([ThreadPoolExecutor.java:1128](http://threadpoolexecutor.java:1128/)) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$[Worker.run](http://worker.run/)([ThreadPoolExecutor.java:628](http://threadpoolexecutor.java:628/)) ~[?:?]
at java.base/java.lang.Thread.run([Thread.java:829](http://thread.java:829/)) [?:?]
To Reproduce Steps to reproduce the behavior:
- setup an s3 pipeline with acknowledgment and visibility timeout
- shutdown the pipeline
Expected behavior This error root cause (connection pool shutdown) should not appear
Additional context Approaches:
- check shutdown flag when calling changing visibility timeout in acknowledgment thread. This approach will potentially increase duplicate messages during shutdown.
- shutdown sqs threads first; wait longer for all ack to be finished before shutting down the sqsClient and pipeline. This approach can mitigate duplication of messages during shutdown but extend the shutdown latency
I am having the same problem.
I've been investigating this some. I created #4740 to provide a long-term solution which uses Data Prepper core to hold on acknowledgements. In the meantime, I'm going to make some more minor changes which avoid the error messages. But, I won't hold the source open to allow the acknowledgements to flush because that will require more core changes.