kop
kop copied to clipboard
[BUG] Memory leak by updating metrics in threads not managed by Netty
Describe the bug
https://github.com/streamnative/kop/blob/60ec259d354d4bb61f82133338b72d2ea46fa6d5/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/stats/ThreadLocalAccessor.java#L24-L36
or older code:
https://github.com/streamnative/kop/blob/82211b9ba2553a68b06c1b2f3479459e5b75f846/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/stats/DataSketchesOpStatsLogger.java#L183-L197
The onRemoval method is responsible to remove the thread local LocalData from the map when the thread stops. However, this method can only be invoked for Netty's thread (FastThreadLocalThread).
To Reproduce
Run KoP with -XX:ActiveProcessorCount=1 JVM option to use ThreadPerTaskExecutor as CompletableFuture's internal thread factory, the memory usage will increase infinitely even with a light workload. From the heap dump, we can see the map's size is unexpected large.
Expected behavior The nodes of the map should be released for threads not managed by Netty.
The following application shows the difference between Netty threads and other threads.
class ThreadLocalAccessor {
@Getter
private final Map<Integer, Boolean> map = new ConcurrentHashMap<>();
private final FastThreadLocal<Integer> local = new FastThreadLocal<>() {
private final AtomicInteger id = new AtomicInteger(0);
@Override
protected Integer initialValue() {
final Integer i = id.getAndIncrement();
map.put(i, true);
System.out.println(Thread.currentThread() + " put " + i);
return i;
}
@Override
protected void onRemoval(Integer i) {
System.out.println(Thread.currentThread() + " remove " + i);
map.remove(i);
}
};
int value() {
return local.get();
}
}
public class FastThreadLocalDemo {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(4, new ThreadFactory() {
private final AtomicInteger id = new AtomicInteger(0);
private final ThreadFactory nettyThreadFactory = new DefaultThreadFactory("netty");
@Override
public Thread newThread(@NotNull Runnable r) {
if (id.getAndIncrement() % 2 == 0) {
return Executors.defaultThreadFactory().newThread(r);
} else {
return nettyThreadFactory.newThread(r);
}
}
});
ThreadLocalAccessor accessor = new ThreadLocalAccessor();
CountDownLatch latch = new CountDownLatch(4);
for (int i = 0; i < 4; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread() + " accessor.value() " + accessor.value());
latch.countDown();
});
}
latch.await();
executor.shutdown();
Thread.sleep(100);
accessor.getMap().keySet().forEach(System.out::println);
}
}
Output:
Thread[netty-1-1,5,main] put 1
Thread[netty-1-2,5,main] put 3
Thread[pool-2-thread-1,5,main] put 2
Thread[pool-1-thread-1,5,main] put 0
Thread[netty-1-1,5,main] accessor.value() 1
Thread[netty-1-2,5,main] accessor.value() 3
Thread[pool-1-thread-1,5,main] accessor.value() 0
Thread[pool-2-thread-1,5,main] accessor.value() 2
Thread[netty-1-1,5,main] remove 1
Thread[netty-1-2,5,main] remove 3
0
2
We can see remove logs are only printed by threads whose prefix is netty-.
Whether should be add a check to judge whether the thread managed by netty
How to check if the thread managed by Netty? Even And it doesn't solve the problem. Because we need to collect the LocalData into the map and remove it when a thread exits. For non-Netty threads, we cannot remove it when a thread exits in a direct way.