amazon-kinesis-client icon indicating copy to clipboard operation
amazon-kinesis-client copied to clipboard

Out of memory issue

Open gaddam1987 opened this issue 2 years ago • 7 comments

We have custom Kafka connector which reads data from kinesis stream to Kafka and we are using KCL. We used blocking queue to share data between the KafkaSourceTask and ShardRecordProcessor. We used BlocingQueue so that kinesis will not poll for the data until kafka persists the data. ShardRecordProcessor publishes data to queue and KafkaSourcetask read data from the queue. But we are seeing OutOfMemoryIssues. Our monitoring system says that Long Garbage-collection time.

Our config:

 scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(config.kinesisStreamName(), kinesisClient))
        );

gaddam1987 avatar Sep 01 '22 08:09 gaddam1987

You might want to take a heap dump of the process to see what's taking up so much memory, try adding this JVM flag -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/path/to/your/heap/dump/file.hprof".

I think it's more likely that your queue is taking longer to drain than you think rather than a bug in Kafka/KCL and that's likely the source of your problems. Your memory pressure would explain the OOMs + long GC pauses.

joshua-kim avatar Sep 01 '22 20:09 joshua-kim

Will take the heapdump, but there is no data pushed to Kinesis stream, and OOM happens after 6 to 8 hours after deploying. The Queue which is used is also Bounded Queue of size 50. I am in assumption that if the processor blocks while processing there won't be any polling to kinesis stream. Sample processor code


   @Override
    @SneakyThrows
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        log.debug("Received records of size {}", processRecordsInput.records().size());
        try {
            for (KinesisClientRecord kinesisClientRecord : processRecordsInput.records()) {
                while (!dataQueue.offer(KinesisClientRecordWrapper.create(kinesisClientRecord, shardId), 10, TimeUnit.SECONDS)) {
                    if (shutDownRequested.get()) {
                        checkpoint(processRecordsInput.checkpointer(), lastCommittedSequenceNumberInKafka.get(shardId));
                        return;
                    }
                }
            }
        } catch (Exception e) {
            errorQueue.put(e);
        }

        if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
            checkpoint(processRecordsInput.checkpointer(), lastCommittedSequenceNumberInKafka.get(shardId));
            nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
        }
    }

gaddam1987 avatar Sep 02 '22 09:09 gaddam1987

Our heap dump points to this thread:

idle-connection-reaper
  at java.util.concurrent.ConcurrentHashMap$EntrySetView.iterator()Ljava/util/Iterator; (ConcurrentHashMap.java:4811)
  at software.amazon.awssdk.http.apache.internal.conn.IdleConnectionReaper$ReaperTask.run()V (IdleConnectionReaper.java:152)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(Ljava/util/concurrent/ThreadPoolExecutor$Worker;)V (ThreadPoolExecutor.java:1128)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run()V (ThreadPoolExecutor.java:628)
  at java.lang.Thread.run()V (Thread.java:829)

With leak suspect details: So it looks like idle-connection-reaper thread is creating too many instances of PoolingHttpClientConnectionManager

30,011 instances of org.apache.http.impl.conn.PoolingHttpClientConnectionManager, loaded by org.apache.kafka.connect.runtime.isolation.PluginClassLoader @ 0xc0fb6688 occupy 968,249,360 (91.57%) bytes.

These instances are referenced from one instance of java.util.concurrent.ConcurrentHashMap$Node[], loaded by <system class loader>, which occupies 1,942,608 (0.18%) bytes.

Keywords

org.apache.http.impl.conn.PoolingHttpClientConnectionManager
org.apache.kafka.connect.runtime.isolation.PluginClassLoader @ 0xc0fb6688
java.util.concurrent.ConcurrentHashMap$Node[]

gaddam1987 avatar Sep 02 '22 23:09 gaddam1987

Are you using WebIdentityTokenFileCredentialsProvider (in the default credential chain)? There's some known memory-leaks in the current release of the java sdk - https://github.com/aws/aws-sdk-java-v2/issues/3270.

joshua-kim avatar Sep 06 '22 16:09 joshua-kim

Yes, we are using that

gaddam1987 avatar Sep 06 '22 17:09 gaddam1987

Until the AWS SDK guys fix this bug you'll probably have to fall back to another credentials provider as a temporary workaround. @gaddam1987 Can you try that and see if you still see these issues? If so I think we can leave this issue open until the next version of KCL pulls in the future version of the AWS SDK with the fix.

joshua-kim avatar Sep 06 '22 17:09 joshua-kim

Fix in https://github.com/aws/aws-sdk-java-v2/pull/3440, this issue should be fixed upon the next SDK upgrade.

joshua-kim avatar Sep 29 '22 19:09 joshua-kim