amazon-kinesis-client
amazon-kinesis-client copied to clipboard
Kinesis shutdown errors
When shutting down kinesis, we get a few shutdown errors. One of them being:
java.lang.IllegalStateException: Shutdown has been called on the cache, can't accept new requests.
at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.throwOnIllegalState(PrefetchRecordsPublisher.java:283)
at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.peekNextResult(PrefetchRecordsPublisher.java:292)
at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.drainQueueForRequests(PrefetchRecordsPublisher.java:388)
at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher$DefaultGetRecordsCacheDaemon.makeRetrievalAttempt(PrefetchRecordsPublisher.java:506)
at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher$DefaultGetRecordsCacheDaemon.run(PrefetchRecordsPublisher.java:464)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
We are aware that the error is an expected part of the graceful shutdown but it creates a lot of noise in our logs and sometimes we get paged as well. Can this issue please be fixed?
We have the same issue. It's not great that a graceful shutdown throws errors and log errors. That's the exact opposite of a graceful shutdown.
We also run into this.
Please fix this. Like already said in the OP, an IllegalStateException
can't be considered the graceful way to shut down.
Would also appreciate if this was fixed
Bumping this issue.
It looks like nobody cares :(
Anyone ?
We have this also for every shardEnd
+1 to solve this one 🙏
17:41:05.218 [prefetch-cache-shardId-000000000000-0000] ERROR s.a.k.r.p.PrefetchRecordsPublisher - testStream-WselIrnpyM:shardId-000000000000 : Unexpected exception was thrown. This could probably be an issue or a bug. Please search for the exception/error online to check what is going on. If the issue persists or is a recurring problem, feel free to open an issue on, https://github.com/awslabs/amazon-kinesis-client.
java.lang.IllegalStateException: Shutdown has been called on the cache, can't accept new requests.
at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.throwOnIllegalState(PrefetchRecordsPublisher.java:283)
at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.peekNextResult(PrefetchRecordsPublisher.java:292)
at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.drainQueueForRequests(PrefetchRecordsPublisher.java:388)
at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher$DefaultGetRecordsCacheDaemon.makeRetrievalAttempt(PrefetchRecordsPublisher.java:506)
at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher$DefaultGetRecordsCacheDaemon.run(PrefetchRecordsPublisher.java:463)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
How it is designed now is peekNextResult
private method calls throwOnIllegalState
before publisherSession.peekNextRecord
:
private void throwOnIllegalState() {
if (executorService.isShutdown()) {
throw new IllegalStateException("Shutdown has been called on the cache, can't accept new requests.");
}
if (!started) {
throw new IllegalStateException("Cache has not been initialized, make sure to call start.");
}
}
private PrefetchRecordsRetrieved peekNextResult() {
throwOnIllegalState();
return publisherSession.peekNextRecord();
}
and if it is a shutdown it will throw IllegalStateException
that will work internally like breaking and goto operator leading to DefaultGetRecordsCacheDaemon.run
catch (Throwable e)
block that logs this with error level. It is arguable how much it is an error as shutdown is expected to happen on app stop / redeploy etc.
The solution seems very easy to do and not breaking the rest, instead of doing in throwOnIllegalState
:
if (executorService.isShutdown()) {
throw new IllegalStateException("Shutdown has been called on the cache, can't accept new requests.");
}
just do this in drainQueueForRequests
:
synchronized void drainQueueForRequests() {
if (executorService.isShutdown()) {
return;
}
final PrefetchRecordsRetrieved recordsToDeliver = peekNextResult();
// .. the rest
}
Of course it is up to the team to decide how to exactly deal with it, but the main point is shutdown is not an exceptional but more like expected valid scenario for PrefetchRecordsPublisher
so there is no need in exception nor log.error.
+1 It's not during a KCL graceful shutdown only; the same can happen when a lease is lost by one of the workers. I've got a multi-threaded client and during the initial lease distribution (normally one thread/worker takes all leases and then they are gradually distributed across the workers) I observe this happening quite often in my tests.