flink-connector-kafka
flink-connector-kafka copied to clipboard
[FLINK-33201][Connectors/Kafka] Fix memory leak in CachingTopicSelector
What is the purpose of the change
In the CachingTopicSelector, a memory leak may occur when the internal logic fails to check the cache size due to a race condition. (https://github.com/apache/flink-connector-kafka/blob/d89a082180232bb79e3c764228c4e7dbb9eb6b8b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java#L287-L289) This PR fixes the memory leak by modifying the logic to be more resilient to failure.
Brief change log
Fix memory leak in CachingTopicSelector that can be triggered by race condition.
Verifying this change
- By analyzing a Java heap dump, I identified a memory leak in the CachingTopicSelector. As in the screenshot, cache has 47,769 elements. If the internal logic were functioning correctly, the number of elements should be less than or equal to CACHE_RESET_SIZE (which is 5).
- Since writing unit tests for this type of bug is challenging, I instead applied a hotfix to our production workload. Before applying the hotfix, the memory leak is observed in the workload in 7 days. After applying the patch, the issue is no longer observed.
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
@qbx2 Can you please rebase your PR?
@MartijnVisser Sure, I just rebased it.
@MartijnVisser Could you please review?
Hey All- We're experiencing some memory issues and wondering if this could be the cause. Will this PR be reviewed soon? @MartijnVisser @tzulitai
@AHeise Can you take a look?
Oh I see the tests can't be run because it's based on an old branch. I'm rebasing myself.