amazon-kinesis-client
amazon-kinesis-client copied to clipboard
Kinesis graceful shutdown for a simple KCL 2.0 consumer not working
We are using the KCL 2.0 to utilize the enhanced fan-out feature to consume a Kinesis stream but we see the application not being able to shutdown gracefully. The ShardRecordProcessor does return successfully on shutdown but the GracefulShutdownCallable's Future returns false. Here is a minimal application that demonstrates the problem: https://pastebin.com/LhLvfkrs - note that this is even more minimal than the example in the documentation (there is no producer and the process implementation is empty. Is the false returned from the Future a red herring?
Here is the log of the run: https://pastebin.com/2TSHxHEZ - (see line 38).
I also notice that leaseLost is called on the ShardRecordProcessor for a Shutdown request (line 33). Is that the right behavior?
I suspect that a shard consumer is stuck in failure loop and holding up the graceful shutdown. Try setting a breakpoint in the following code to confirm: https://github.com/awslabs/amazon-kinesis-client/blob/98b016276b0387ed77d1049c706673ad1fbf6b22/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java#L348-L378
You can also gather more information by enabling debug logging. Specifically, if a task is stuck in a failure loop, the following code should print the appropriate exception to the logs: https://github.com/awslabs/amazon-kinesis-client/blob/98b016276b0387ed77d1049c706673ad1fbf6b22/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java#L337-L346
I think KCL 1.x has the same issue.
https://github.com/awslabs/amazon-kinesis-client/pull/538
Ping any updates on this? I gave you a small self-contained program that demonstrates the issue. Can this be prioritized?
We are seeing the exact same problem (and log) using polling retrieval strategy. I placed breakpoints in the mentioned places and there was nothing unusual visible.
From the message
Shutdown completed, but shutdownCompleteLatch still had outstanding 1 with a current value of 1. shutdownComplete: false -- Consumer Map: 0
it reproducible takes around one second for the Worker/Scheduler loop to start final shutdown (Starting worker's final shutdown.
).
TRACE logs
2020-06-03 11:06:43.205 INFO 9380 --- [ main] d.b.s.a.k.SpringLifecycleWorkerStarter : Kinesis worker shutdown initiated...
2020-06-03 11:06:43.205 DEBUG 9380 --- [tyEventLoop-2-1] software.amazon.awssdk.requestId : x-amzn-RequestId : QZTSO5RLCACBX0NLGVJ9CQPAOOXFQOCJL6SCRH59LWFGWWXRDFHV
2020-06-03 11:06:43.205 TRACE 9380 --- [tyEventLoop-2-1] software.amazon.awssdk.request : Done parsing service response.
2020-06-03 11:06:43.205 DEBUG 9380 --- [dProcessor-0000] s.a.k.m.CloudWatchPublisherRunnable : Enqueueing 0 datums for publication
2020-06-03 11:06:43.205 TRACE 9380 --- [dProcessor-0000] d.b.s.a.k.AwsKinesisRecordProcessor : Checkpointed batch on shard <shardId-000000000000> of <foo-event-stream>.
2020-06-03 11:06:43.205 DEBUG 9380 --- [trics-publisher] s.a.k.m.CloudWatchPublisherRunnable : Waiting up to 5814 ms for 200 more datums to appear.
2020-06-03 11:06:43.205 DEBUG 9380 --- [dProcessor-0000] s.a.k.m.CloudWatchPublisherRunnable : Enqueueing 0 datums for publication
2020-06-03 11:06:43.205 DEBUG 9380 --- [dProcessor-0000] s.a.k.m.CloudWatchPublisherRunnable : Enqueueing 0 datums for publication
2020-06-03 11:06:43.205 DEBUG 9380 --- [trics-publisher] s.a.k.m.CloudWatchPublisherRunnable : Waiting up to 5814 ms for 200 more datums to appear.
2020-06-03 11:06:43.205 DEBUG 9380 --- [dProcessor-0000] s.a.k.r.p.PrefetchRecordsPublisher : shardId-000000000000: Record delivery time to shard consumer is 81 millis
2020-06-03 11:06:43.205 DEBUG 9380 --- [dProcessor-0000] s.a.k.m.CloudWatchPublisherRunnable : Enqueueing 0 datums for publication
2020-06-03 11:06:43.205 DEBUG 9380 --- [trics-publisher] s.a.k.m.CloudWatchPublisherRunnable : Waiting up to 5814 ms for 200 more datums to appear.
2020-06-03 11:06:43.206 INFO 9380 --- [ool-16-thread-1] d.b.s.a.k.SpringLifecycleWorkerStarter : Shutting down Kinesis worker of stream <foo-event-stream>...
2020-06-03 11:06:44.122 DEBUG 9380 --- [oo-event-stream] s.a.k.coordinator.DiagnosticEventLogger : Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=2, maximumPoolSize=2147483647)
2020-06-03 11:06:44.123 INFO 9380 --- [dProcessor-0000] d.b.s.a.k.AwsKinesisRecordProcessor : Shutting down record processor for shard <shardId-000000000000> on stream <foo-event-stream>...
2020-06-03 11:06:44.123 DEBUG 9380 --- [dProcessor-0000] s.a.k.c.ShardRecordProcessorCheckpointer : Checkpointing shardId-000000000000, token 51548fb8-e07b-4de9-a0e3-c6ded5377a91 at largest permitted value {SequenceNumber: 49607607585222188760376988941850748685729608644289363970,SubsequenceNumber: 0}
2020-06-03 11:06:44.123 INFO 9380 --- [dProcessor-0000] d.b.s.a.k.AwsKinesisRecordProcessor : Record processor for shard <shardId-000000000000> on stream <foo-event-stream> shut down successfully.
2020-06-03 11:06:44.123 INFO 9380 --- [dShutdownThread] s.amazon.kinesis.coordinator.Scheduler : Worker shutdown requested.
2020-06-03 11:06:44.124 INFO 9380 --- [dShutdownThread] s.a.k.l.d.DynamoDBLeaseCoordinator : Worker BM1029.bmeister.biz:a71a6eb1-abe7-45d4-bad4-7d2e889e8d2c has successfully stopped lease-tracking threads
2020-06-03 11:06:44.128 DEBUG 9380 --- [0000000000-0000] s.a.a.c.i.ExecutionInterceptorChain : Creating an interceptor chain that will apply interceptors in the following order: [software.amazon.awssdk.awscore.interceptor.HelpfulUnknownHostExceptionInterceptor@10072880]
2020-06-03 11:06:44.129 DEBUG 9380 --- [0000000000-0000] software.amazon.awssdk.request : Sending Request: DefaultSdkHttpFullRequest(httpMethod=POST, protocol=http, host=localhost, port=14567, encodedPath=/, headers=[amz-sdk-invocation-id, Content-Length, Content-Type, User-Agent, X-Amz-Target], queryParameters=[])
2020-06-03 11:06:44.129 TRACE 9380 --- [0000000000-0000] s.amazon.awssdk.auth.signer.Aws4Signer : AWS4 Canonical Request: POST
/
amz-sdk-invocation-id:c8413e40-385a-c26a-adfe-e40c6078205d
amz-sdk-retry:0/0/500
content-length:258
content-type:application/x-amz-json-1.1
host:localhost:14567
x-amz-date:20200603T090644Z
x-amz-target:Kinesis_20131202.GetRecords
amz-sdk-invocation-id;amz-sdk-retry;content-length;content-type;host;x-amz-date;x-amz-target
6c332968cd24b0795e992d1adacdd2a0d36de7dc163bbf5e4674ab2176639e10
2020-06-03 11:06:44.129 DEBUG 9380 --- [0000000000-0000] s.amazon.awssdk.auth.signer.Aws4Signer : AWS4 String to sign: AWS4-HMAC-SHA256
20200603T090644Z
20200603/local/kinesis/aws4_request
3d3fb509cd6a83cd97728d477a51617ef058b9ff308160c6184de9a69a73c2d2
2020-06-03 11:06:44.134 DEBUG 9380 --- [tyEventLoop-2-7] software.amazon.awssdk.request : Received successful response: 200
2020-06-03 11:06:44.134 TRACE 9380 --- [tyEventLoop-2-7] software.amazon.awssdk.request : Parsing service response JSON.
2020-06-03 11:06:44.134 DEBUG 9380 --- [tyEventLoop-2-7] software.amazon.awssdk.requestId : x-amzn-RequestId : 8bae8e40-a579-11ea-93c7-032ed6203102
2020-06-03 11:06:44.134 TRACE 9380 --- [tyEventLoop-2-7] software.amazon.awssdk.request : Done parsing service response.
2020-06-03 11:06:44.135 DEBUG 9380 --- [0000000000-0000] s.a.k.m.CloudWatchPublisherRunnable : Enqueueing 0 datums for publication
2020-06-03 11:06:44.135 DEBUG 9380 --- [trics-publisher] s.a.k.m.CloudWatchPublisherRunnable : Waiting up to 4884 ms for 200 more datums to appear.
2020-06-03 11:06:44.135 DEBUG 9380 --- [0000000000-0000] s.a.k.m.CloudWatchPublisherRunnable : Enqueueing 0 datums for publication
2020-06-03 11:06:44.135 DEBUG 9380 --- [trics-publisher] s.a.k.m.CloudWatchPublisherRunnable : Waiting up to 4884 ms for 200 more datums to appear.
2020-06-03 11:06:44.135 DEBUG 9380 --- [0000000000-0000] s.a.k.r.p.PrefetchRecordsPublisher : shardId-000000000000 : Current Prefetch Counter States: { Requests: 1, Records: 0, Bytes: 0 }
2020-06-03 11:06:45.122 DEBUG 9380 --- [oo-event-stream] s.a.kinesis.lifecycle.ShardConsumer : Shutdown(shardId-000000000000): Lease lost triggered.
2020-06-03 11:06:45.122 DEBUG 9380 --- [oo-event-stream] s.a.kinesis.lifecycle.ShardConsumer : Shutdown(shardId-000000000000): Subscriber cancelled.
2020-06-03 11:06:45.122 DEBUG 9380 --- [oo-event-stream] s.a.k.coordinator.DiagnosticEventLogger : Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=0, largestPoolSize=2, maximumPoolSize=2147483647)
2020-06-03 11:06:45.123 DEBUG 9380 --- [dProcessor-0000] s.amazon.kinesis.lifecycle.ShutdownTask : Invoking shutdown() for shard shardId-000000000000, concurrencyToken 51548fb8-e07b-4de9-a0e3-c6ded5377a91. Shutdown reason: LEASE_LOST
2020-06-03 11:06:45.124 INFO 9380 --- [dProcessor-0000] d.b.s.a.k.AwsKinesisRecordProcessor : Lease lost for shard <shardId-000000000000> on stream <foo-event-stream>.
2020-06-03 11:06:45.124 DEBUG 9380 --- [dProcessor-0000] s.amazon.kinesis.lifecycle.ShutdownTask : Shutting down retrieval strategy.
2020-06-03 11:06:45.125 DEBUG 9380 --- [dProcessor-0000] s.amazon.kinesis.lifecycle.ShutdownTask : Record processor completed shutdown() for shard shardId-000000000000
2020-06-03 11:06:45.125 INFO 9380 --- [0000000000-0000] s.a.k.r.p.PrefetchRecordsPublisher : shardId-000000000000 : Thread was interrupted, indicating shutdown was called on the cache.
2020-06-03 11:06:45.125 DEBUG 9380 --- [0000000000-0000] s.a.k.m.CloudWatchPublisherRunnable : Enqueueing 0 datums for publication
2020-06-03 11:06:45.125 DEBUG 9380 --- [dProcessor-0000] s.a.k.m.CloudWatchPublisherRunnable : Enqueueing 0 datums for publication
2020-06-03 11:06:45.125 DEBUG 9380 --- [trics-publisher] s.a.k.m.CloudWatchPublisherRunnable : Waiting up to 3894 ms for 200 more datums to appear.
2020-06-03 11:06:45.126 INFO 9380 --- [dShutdownThread] downCoordinator$GracefulShutdownCallable : Waiting for 1 record processors to complete final shutdown
2020-06-03 11:06:46.123 DEBUG 9380 --- [oo-event-stream] s.a.kinesis.lifecycle.ShardConsumer : Shutdown(shardId-000000000000): Lease lost triggered.
2020-06-03 11:06:46.123 DEBUG 9380 --- [oo-event-stream] s.a.kinesis.lifecycle.ShardConsumer : Shutdown(shardId-000000000000): Subscriber cancelled.
2020-06-03 11:06:46.123 DEBUG 9380 --- [oo-event-stream] s.a.k.coordinator.DiagnosticEventLogger : Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=1, coreThreads=0, leasesOwned=0, largestPoolSize=2, maximumPoolSize=2147483647)
2020-06-03 11:06:46.127 INFO 9380 --- [dShutdownThread] downCoordinator$GracefulShutdownCallable : Waiting for 1 record processors to complete final shutdown
2020-06-03 11:06:47.123 DEBUG 9380 --- [oo-event-stream] s.a.kinesis.lifecycle.ShardConsumer : Shutdown(shardId-000000000000): Lease lost triggered.
2020-06-03 11:06:47.123 DEBUG 9380 --- [oo-event-stream] s.a.kinesis.lifecycle.ShardConsumer : Shutdown(shardId-000000000000): Subscriber cancelled.
2020-06-03 11:06:47.123 DEBUG 9380 --- [oo-event-stream] s.amazon.kinesis.coordinator.Scheduler : Removed consumer for shardId-000000000000 as lease has been lost
2020-06-03 11:06:47.123 DEBUG 9380 --- [oo-event-stream] s.a.k.coordinator.DiagnosticEventLogger : Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=0, largestPoolSize=2, maximumPoolSize=2147483647)
2020-06-03 11:06:47.128 INFO 9380 --- [dShutdownThread] downCoordinator$GracefulShutdownCallable : Waiting for 1 record processors to complete final shutdown
2020-06-03 11:06:47.128 INFO 9380 --- [dShutdownThread] downCoordinator$GracefulShutdownCallable : Shutdown completed, but shutdownCompleteLatch still had outstanding 1 with a current value of 1. shutdownComplete: false -- Consumer Map: 0
> Graceful Shutdown returns: <false>
> From here on, we loop until Scheduler.shutdownComplete() reports true as per workaround mentioned here: https://github.com/awslabs/amazon-kinesis-client/issues/542#issue-430918960
2020-06-03 11:06:47.129 DEBUG 9380 --- [ool-16-thread-1] d.b.s.a.k.SpringLifecycleWorkerStarter : Waiting for Scheduler of stream <foo-event-stream> to shutdown as well...
2020-06-03 11:06:47.629 DEBUG 9380 --- [ool-16-thread-1] d.b.s.a.k.SpringLifecycleWorkerStarter : Waiting for Scheduler of stream <foo-event-stream> to shutdown as well...
2020-06-03 11:06:48.123 INFO 9380 --- [oo-event-stream] s.amazon.kinesis.coordinator.Scheduler : All record processors have been shutdown successfully.
2020-06-03 11:06:48.123 INFO 9380 --- [oo-event-stream] s.amazon.kinesis.coordinator.Scheduler : Starting worker's final shutdown.
2020-06-03 11:06:48.123 INFO 9380 --- [oo-event-stream] s.a.k.m.CloudWatchPublisherRunnable : Shutting down CWPublication thread.
2020-06-03 11:06:48.125 DEBUG 9380 --- [trics-publisher] s.a.k.m.CloudWatchPublisherRunnable : Drained 0 datums from queue
2020-06-03 11:06:48.125 DEBUG 9380 --- [trics-publisher] s.a.k.m.CloudWatchPublisherRunnable : Shutting down with 0 datums left on the queue
2020-06-03 11:06:48.125 INFO 9380 --- [trics-publisher] s.a.k.m.CloudWatchPublisherRunnable : CWPublication thread finished.
2020-06-03 11:06:48.125 INFO 9380 --- [oo-event-stream] s.amazon.kinesis.coordinator.Scheduler : Worker loop is complete. Exiting from worker.
2020-06-03 11:06:48.129 INFO 9380 --- [ool-16-thread-1] d.b.s.a.k.SpringLifecycleWorkerStarter : Scheduler of stream <foo-event-stream> shut down successfully in PT4.006S.
2020-06-03 11:06:48.129 INFO 9380 --- [ main] d.b.s.a.k.SpringLifecycleWorkerStarter : Kinesis worker shutdown phase completed.
@aggarwal What's the reason for || context.scheduler().shardInfoShardConsumerMap().isEmpty()
here?
https://github.com/awslabs/amazon-kinesis-client/blob/be1e3cd112facdc60e57defe6c9b8601240d1565/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/GracefulShutdownCoordinator.java#L46-L48
To fix the issue at hand the condition above would need to be split:
- Wait for
shardInfoShardConsumerMap().isEmpty()
to get empty (if desired at all) - Wait for Scheduler shutdown (
context.scheduler().shutdownComplete()
) forshardConsumerDispatchPollIntervalMillis + delta
I am assuming the condition under 1. is required for some particular scenario. Otherwise, it could be dropped altogether. Imho, Scheduler.shouldShutdown()
already checks the exact same condition and is a precondition for Scheduler.shutdownComplete()
to return true
in success case.
To fix the issue at hand the condition above would need to be split:
- Wait for
shardInfoShardConsumerMap().isEmpty()
to get empty (if desired at all)- Wait for Scheduler shutdown (
context.scheduler().shutdownComplete()
) forshardConsumerDispatchPollIntervalMillis + delta
@jrehwaldt, how does this fix the issue?
When shardInfoShardConsumerMap().isEmpty()
returns true
, it means that the scheduler does not have consumers left for any lease. At this point this scheduler likely does not have ownership over any lease.
It's been a while since I read this code.
I believe it's a matter of when the shutdown is considered successful. If
it means that the scheduler does not have consumers left for any lease.
is sufficient for successful shutdown, then the graceful shutdown future should return true
instead of false
. Otherwise, if proper shutdown of the Scheduler's executor and CloudWatch publisher is expected as well, the shutdown has to explicitly wait for this to happen.
The current implementation considers it enough to have no leases, but still reports to the clients shutdown was not successful. I'd argue the API has to decide.
@jrehwaldt, how does this fix the issue?
In
https://github.com/awslabs/amazon-kinesis-client/blob/be1e3cd112facdc60e57defe6c9b8601240d1565/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/GracefulShutdownCoordinator.java#L135-L146
outstanding
is non-zero. This is the case sinc not all consumers have counted down their latch. Two places exist for counting down:
- Standard case: https://github.com/awslabs/amazon-kinesis-client/blob/be1e3cd112facdc60e57defe6c9b8601240d1565/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerShutdownNotification.java#L78-L87
- Edge case: https://github.com/awslabs/amazon-kinesis-client/blob/be1e3cd112facdc60e57defe6c9b8601240d1565/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java#L473-L481
Case 1 is triggered through the following flow:
-
ConsumerState.ShutdownCompleteState.createTask()
https://github.com/awslabs/amazon-kinesis-client/blob/be1e3cd112facdc60e57defe6c9b8601240d1565/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java#L22-L84 -
ShardConsumerShutdownNotification.shutdownComplete()
-
<latch>.countDown()
From the flow diagram we can see that Shutting Down
state is a pre-condition for Shutdown Complete
. The state Shutting Down
is reached once the leases are lost
https://github.com/awslabs/amazon-kinesis-client/blob/be1e3cd112facdc60e57defe6c9b8601240d1565/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java#L107-L108
Considering the graceful shutdown condition in the light of this flow means shutdown of Schedulers will only start once all leases are lost, which - if I understand your statement correct - roughly translates into shardInfoShardConsumerMap().isEmpty() == true
. With default configuration the Scheduler's main loop sleeps 1s (shardConsumerDispatchPollIntervalMillis
) before it will check for shutdown request again. This means it takes rather long until the Scheduler figures out it'll have to shut down.
That's my understanding 🤷
Do we have any updates on this issue? getting following logs after a graceful shutdown called
[pool-20-thread-1] INFO software.amazon.kinesis.leases.LeaseCleanupManager - Number of pending leases to clean before the scan : 0
[pool-20-thread-1] INFO software.amazon.kinesis.leases.LeaseCleanupManager - Number of pending leases to clean before the scan : 0
[pool-20-thread-1] INFO software.amazon.kinesis.leases.LeaseCleanupManager - Number of pending leases to clean before the scan : 0
[pool-20-thread-1] INFO software.amazon.kinesis.leases.LeaseCleanupManager - Number of pending leases to clean before the scan : 0
[pool-20-thread-1] INFO software.amazon.kinesis.leases.LeaseCleanupManager - Number of pending leases to clean before the scan : 0
[pool-20-thread-1] INFO software.amazon.kinesis.leases.LeaseCleanupManager - Number of pending leases to clean before the scan : 0
[pool-20-thread-1] INFO software.amazon.kinesis.leases.LeaseCleanupManager - Number of pending leases to clean before the scan : 0
Can we get an update on this? Has this been fixed? Why does it take 9 months to get an update on the recommended SDK for your flagship product?
Ping. Any updates on this? Is the Kinesis team looking at these Github issues at all?
I am using spring-boot and have the same issue since there is no manually checkpointing introduced in the demo https://github.com/awslabs/amazon-kinesis-client/issues/608 together it will result in a data loss in the kinesis stream.
we are having the same problem: Shutdown completed, but shutdownCompleteLatch still had outstanding 2 with a current value of 2. shutdownComplete: false -- Consumer Map: 0
any updates on this?
After diving into this issue, I found that startGracefulShutdown
is returning false because the shutdownCompleteLatch
is not counting down to zero. To be specific, the original code provided by the OP gets stuck in this loop until the worker is finished shutting down. The worker finishes its shutdown while it’s in this block, and this causes workerShutdownWithRemaining
to return true and, thus, waitForRecordProcessors
to return false. In summary, there are two main reasons as to why startGracefulShutdown
returns false:
- The shutdownCompleteLatch must be decremented to avoid getting stuck in the loop.
- The worker’s final shutdown needs to complete before returning the result of startGracefulShutdown.
The latest PR (#1302) addresses these issues.
For 1), we need to count down the shutdownCompleteLatch during the shutdown process. Originally, this was done in the task of the ShutdownCompleteState (ref), but in #1302 this line was moved to be executed during the ShutdownTask (ref).
For 2), a finalShutdownLatch
was introduced to the GracefulShutdownContext so that we wait for the finalShutdown to complete before returning.
I'll be closing this issue, but feel free to reopen it if you find that the PR does not address your issue.