amazon-kinesis-client
amazon-kinesis-client copied to clipboard
Loosing leases in multiple nodes / multiple threads per worker
We have two nodes (actually Kubernetes pods), each running 10 threads for record processors consuming from a 20 shard stream with no fan-out.
The relevant bits of the configuration are as follows:
final var configsBuilder = new ConfigsBuilder(
"some-stream",
"some-stream-consumer",
this.kinesisClient,
this.dynamoDbClient,
this.cloudWatchClient,
"some-stream-consumer-0", // or "some-stream-consumer-1
this::createShardRecordProcessorInternal); // function that creates shard processors
...
final var leaseManagementConfig = configsBuilder.leaseManagementConfig()
.cleanupLeasesUponShardCompletion(false)
.ignoreUnexpectedChildShards(false)
.maxLeasesForWorker(10);
...
final var processorConfig = configsBuilder.processorConfig()
.callProcessRecordsEvenForEmptyRecordList(true);
final var retrievalConfig = configsBuilder.retrievalConfig()
...
.retrievalSpecificConfig(
new PollingConfig(configuration.streamName, this.kinesisClient));
The dynamo DB is provisioned with 20 read and write units and reads and writes are well below the limit.
We get into frequent errors like:
software.amazon.kinesis.exceptions.ShutdownException: Can't update checkpoint - instance doesn't hold the lease for this shard at software.amazon.kinesis.checkpoint.dynamodb.DynamoDBCheckpointer.setCheckpoint(DynamoDBCheckpointer.java:62) ~[amazon-kinesis-client-2.2.11.jar!/:?] at software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer.advancePosition(ShardRecordProcessorCheckpointer.java:258) ~[amazon-kinesis-client-2.2.11.jar!/:?] at software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer.checkpoint(ShardRecordProcessorCheckpointer.java:122) ~[amazon-kinesis-client-2.2.11.jar!/:?] at software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer.checkpoint(ShardRecordProcessorCheckpointer.java:94) ~[amazon-kinesis-client-2.2.11.jar!/:?] at
and
ERROR software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator - Throwable encountered in lease taking threadjava.lang.NullPointerException: Cannot invoke "String.equals(Object)" because "mostLoadedWorkerIdentifier" is null at software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker.chooseLeasesToSteal(DynamoDBLeaseTaker.java:480) ~[amazon-kinesis-client-2.2.11.jar!/:?] at software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker.computeLeasesToTake(DynamoDBLeaseTaker.java:387) ~[amazon-kinesis-client-2.2.11.jar!/:?] at software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker.takeLeases(DynamoDBLeaseTaker.java:172) ~[amazon-kinesis-client-2.2.11.jar!/:?] at software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker.takeLeases(DynamoDBLeaseTaker.java:119) ~[amazon-kinesis-client-2.2.11.jar!/:?] at software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator.runLeaseTaker(DynamoDBLeaseCoordinator.java:254) ~[amazon-kinesis-client-2.2.11.jar!/:?] at software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator$TakerRunnable.run(DynamoDBLeaseCoordinator.java:185) ~[amazon-kinesis-client-2.2.11.jar!/:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at java.lang.Thread.run(Thread.java:833) ~[?:?]
The problems start appearing seemingly at random and don't go away with restart.
FYI: The following seems to help though:
- start the whole consumer
- change all
leaseOwner
fields in the DynamoDB table to empty string - start consumer back up
I am guessing this has something to do with the worker identifier being stable (and reused between start-ups)?
The problems seem to appear more frequently when you change the number of nodes and threads. For example going from 2 pods 10 threads each to 4 pods 5 threads each results in these odd lease management errors pretty much consistently.
Turns out that resetting the leaseOwner
has only intermittent effect. Consumer still gets into the weird state after some time.
As a potential aid for repro: the consumer we are seeing this with checkpoints rather infrequently. It processes larger chunks of the stream, sometimes for minutes, tens of thousands of records per shard at a time, and then checkpoints at the end. Other consumers almost never exhibit this behavior (like, maybe once in several weeks). I wonder if the checkpointing frequency has something to do with this. Perhaps other pods consider lease dormant for some reason?
Default value for failoverTimeMillis in KCL 2.x is 10seconds. Customer can enable debug logs and check for this log to see if the worker is renewing all its leases before the failoverTimeMillis.
For the second exception , customer can enable debug logs to check this logs and check what the value of most loaded worker is as that is where NPE is happening.