amazon-kinesis-client icon indicating copy to clipboard operation
amazon-kinesis-client copied to clipboard

Kinesis shutdown errors

Open afreen-aliya opened this issue 2 years ago • 2 comments

When shutting down kinesis, we get a few shutdown errors. One of them being:

java.lang.IllegalStateException: Shutdown has been called on the cache, can't accept new requests.
    at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.throwOnIllegalState(PrefetchRecordsPublisher.java:283)
    at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.peekNextResult(PrefetchRecordsPublisher.java:292)
    at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.drainQueueForRequests(PrefetchRecordsPublisher.java:388)
    at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher$DefaultGetRecordsCacheDaemon.makeRetrievalAttempt(PrefetchRecordsPublisher.java:506)
    at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher$DefaultGetRecordsCacheDaemon.run(PrefetchRecordsPublisher.java:464)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

We are aware that the error is an expected part of the graceful shutdown but it creates a lot of noise in our logs and sometimes we get paged as well. Can this issue please be fixed?

afreen-aliya avatar Feb 17 '22 16:02 afreen-aliya

We have the same issue. It's not great that a graceful shutdown throws errors and log errors. That's the exact opposite of a graceful shutdown.

pelletier197 avatar Apr 08 '22 14:04 pelletier197

We also run into this.

royyeah avatar May 06 '22 07:05 royyeah

Please fix this. Like already said in the OP, an IllegalStateException can't be considered the graceful way to shut down.

weaselmetal avatar Oct 20 '22 06:10 weaselmetal

Would also appreciate if this was fixed

montelius avatar Dec 28 '22 15:12 montelius

Bumping this issue.

tzfromaz avatar Jun 28 '23 18:06 tzfromaz

It looks like nobody cares :(

patricm-enbw avatar Jul 03 '23 10:07 patricm-enbw

Anyone ?

We have this also for every shardEnd

praetp avatar Jul 20 '23 09:07 praetp

+1 to solve this one 🙏

17:41:05.218 [prefetch-cache-shardId-000000000000-0000] ERROR s.a.k.r.p.PrefetchRecordsPublisher - testStream-WselIrnpyM:shardId-000000000000 :  Unexpected exception was thrown. This could probably be an issue or a bug. Please search for the exception/error online to check what is going on. If the issue persists or is a recurring problem, feel free to open an issue on, https://github.com/awslabs/amazon-kinesis-client.
java.lang.IllegalStateException: Shutdown has been called on the cache, can't accept new requests.
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.throwOnIllegalState(PrefetchRecordsPublisher.java:283)
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.peekNextResult(PrefetchRecordsPublisher.java:292)
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.drainQueueForRequests(PrefetchRecordsPublisher.java:388)
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher$DefaultGetRecordsCacheDaemon.makeRetrievalAttempt(PrefetchRecordsPublisher.java:506)
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher$DefaultGetRecordsCacheDaemon.run(PrefetchRecordsPublisher.java:463)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

How it is designed now is peekNextResult private method calls throwOnIllegalState before publisherSession.peekNextRecord:

    private void throwOnIllegalState() {
        if (executorService.isShutdown()) {
            throw new IllegalStateException("Shutdown has been called on the cache, can't accept new requests.");
        }

        if (!started) {
            throw new IllegalStateException("Cache has not been initialized, make sure to call start.");
        }
    }

    private PrefetchRecordsRetrieved peekNextResult() {
        throwOnIllegalState();
        return publisherSession.peekNextRecord();
    }

and if it is a shutdown it will throw IllegalStateException that will work internally like breaking and goto operator leading to DefaultGetRecordsCacheDaemon.run catch (Throwable e) block that logs this with error level. It is arguable how much it is an error as shutdown is expected to happen on app stop / redeploy etc.

The solution seems very easy to do and not breaking the rest, instead of doing in throwOnIllegalState:

        if (executorService.isShutdown()) {
            throw new IllegalStateException("Shutdown has been called on the cache, can't accept new requests.");
        }

just do this in drainQueueForRequests:

    synchronized void drainQueueForRequests() {
        if (executorService.isShutdown()) {
            return;
        }
        final PrefetchRecordsRetrieved recordsToDeliver = peekNextResult();
// .. the rest
    }

Of course it is up to the team to decide how to exactly deal with it, but the main point is shutdown is not an exceptional but more like expected valid scenario for PrefetchRecordsPublisher so there is no need in exception nor log.error.

vsadokhin avatar Dec 13 '23 10:12 vsadokhin

+1 It's not during a KCL graceful shutdown only; the same can happen when a lease is lost by one of the workers. I've got a multi-threaded client and during the initial lease distribution (normally one thread/worker takes all leases and then they are gradually distributed across the workers) I observe this happening quite often in my tests.

michaelzelaia avatar Dec 13 '23 13:12 michaelzelaia