amazon-kinesis-client
amazon-kinesis-client copied to clipboard
Out of memory issue
We have custom Kafka connector which reads data from kinesis stream to Kafka and we are using KCL. We used blocking queue to share data between the KafkaSourceTask and ShardRecordProcessor. We used BlocingQueue so that kinesis will not poll for the data until kafka persists the data. ShardRecordProcessor publishes data to queue and KafkaSourcetask read data from the queue. But we are seeing OutOfMemoryIssues. Our monitoring system says that Long Garbage-collection time.
Our config:
scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(config.kinesisStreamName(), kinesisClient))
);
You might want to take a heap dump of the process to see what's taking up so much memory, try adding this JVM flag -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/path/to/your/heap/dump/file.hprof"
.
I think it's more likely that your queue is taking longer to drain than you think rather than a bug in Kafka/KCL and that's likely the source of your problems. Your memory pressure would explain the OOMs + long GC pauses.
Will take the heapdump, but there is no data pushed to Kinesis stream, and OOM happens after 6 to 8 hours after deploying. The Queue which is used is also Bounded Queue of size 50. I am in assumption that if the processor blocks while processing there won't be any polling to kinesis stream. Sample processor code
@Override
@SneakyThrows
public void processRecords(ProcessRecordsInput processRecordsInput) {
log.debug("Received records of size {}", processRecordsInput.records().size());
try {
for (KinesisClientRecord kinesisClientRecord : processRecordsInput.records()) {
while (!dataQueue.offer(KinesisClientRecordWrapper.create(kinesisClientRecord, shardId), 10, TimeUnit.SECONDS)) {
if (shutDownRequested.get()) {
checkpoint(processRecordsInput.checkpointer(), lastCommittedSequenceNumberInKafka.get(shardId));
return;
}
}
}
} catch (Exception e) {
errorQueue.put(e);
}
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
checkpoint(processRecordsInput.checkpointer(), lastCommittedSequenceNumberInKafka.get(shardId));
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
}
}
Our heap dump points to this thread:
idle-connection-reaper
at java.util.concurrent.ConcurrentHashMap$EntrySetView.iterator()Ljava/util/Iterator; (ConcurrentHashMap.java:4811)
at software.amazon.awssdk.http.apache.internal.conn.IdleConnectionReaper$ReaperTask.run()V (IdleConnectionReaper.java:152)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Ljava/util/concurrent/ThreadPoolExecutor$Worker;)V (ThreadPoolExecutor.java:1128)
at java.util.concurrent.ThreadPoolExecutor$Worker.run()V (ThreadPoolExecutor.java:628)
at java.lang.Thread.run()V (Thread.java:829)
With leak suspect details: So it looks like idle-connection-reaper
thread is creating too many instances of PoolingHttpClientConnectionManager
30,011 instances of org.apache.http.impl.conn.PoolingHttpClientConnectionManager, loaded by org.apache.kafka.connect.runtime.isolation.PluginClassLoader @ 0xc0fb6688 occupy 968,249,360 (91.57%) bytes.
These instances are referenced from one instance of java.util.concurrent.ConcurrentHashMap$Node[], loaded by <system class loader>, which occupies 1,942,608 (0.18%) bytes.
Keywords
org.apache.http.impl.conn.PoolingHttpClientConnectionManager
org.apache.kafka.connect.runtime.isolation.PluginClassLoader @ 0xc0fb6688
java.util.concurrent.ConcurrentHashMap$Node[]
Are you using WebIdentityTokenFileCredentialsProvider
(in the default credential chain)? There's some known memory-leaks in the current release of the java sdk - https://github.com/aws/aws-sdk-java-v2/issues/3270.
Yes, we are using that
Until the AWS SDK guys fix this bug you'll probably have to fall back to another credentials provider as a temporary workaround. @gaddam1987 Can you try that and see if you still see these issues? If so I think we can leave this issue open until the next version of KCL pulls in the future version of the AWS SDK with the fix.
Fix in https://github.com/aws/aws-sdk-java-v2/pull/3440, this issue should be fixed upon the next SDK upgrade.