kop icon indicating copy to clipboard operation
kop copied to clipboard

[BUG] Memory leak by updating metrics in threads not managed by Netty

Open BewareMyPower opened this issue 3 years ago • 3 comments

Describe the bug

https://github.com/streamnative/kop/blob/60ec259d354d4bb61f82133338b72d2ea46fa6d5/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/stats/ThreadLocalAccessor.java#L24-L36

or older code:

https://github.com/streamnative/kop/blob/82211b9ba2553a68b06c1b2f3479459e5b75f846/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/stats/DataSketchesOpStatsLogger.java#L183-L197

The onRemoval method is responsible to remove the thread local LocalData from the map when the thread stops. However, this method can only be invoked for Netty's thread (FastThreadLocalThread).

To Reproduce Run KoP with -XX:ActiveProcessorCount=1 JVM option to use ThreadPerTaskExecutor as CompletableFuture's internal thread factory, the memory usage will increase infinitely even with a light workload. From the heap dump, we can see the map's size is unexpected large.

image image

Expected behavior The nodes of the map should be released for threads not managed by Netty.

BewareMyPower avatar May 11 '22 08:05 BewareMyPower

The following application shows the difference between Netty threads and other threads.

class ThreadLocalAccessor {

    @Getter
    private final Map<Integer, Boolean> map = new ConcurrentHashMap<>();
    private final FastThreadLocal<Integer> local = new FastThreadLocal<>() {
        private final AtomicInteger id = new AtomicInteger(0);

        @Override
        protected Integer initialValue() {
            final Integer i = id.getAndIncrement();
            map.put(i, true);
            System.out.println(Thread.currentThread() + " put " + i);
            return i;
        }

        @Override
        protected void onRemoval(Integer i) {
            System.out.println(Thread.currentThread() + " remove " + i);
            map.remove(i);
        }
    };

    int value() {
        return local.get();
    }
}

public class FastThreadLocalDemo {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(4, new ThreadFactory() {
            private final AtomicInteger id = new AtomicInteger(0);
            private final ThreadFactory nettyThreadFactory = new DefaultThreadFactory("netty");

            @Override
            public Thread newThread(@NotNull Runnable r) {
                if (id.getAndIncrement() % 2 == 0) {
                    return Executors.defaultThreadFactory().newThread(r);
                } else {
                    return nettyThreadFactory.newThread(r);
                }
            }
        });
        ThreadLocalAccessor accessor = new ThreadLocalAccessor();
        CountDownLatch latch = new CountDownLatch(4);
        for (int i = 0; i < 4; i++) {
            executor.execute(() -> {
                System.out.println(Thread.currentThread() + " accessor.value() " + accessor.value());
                latch.countDown();
            });
        }
        latch.await();
        executor.shutdown();
        Thread.sleep(100);
        accessor.getMap().keySet().forEach(System.out::println);
    }
}

Output:

Thread[netty-1-1,5,main] put 1
Thread[netty-1-2,5,main] put 3
Thread[pool-2-thread-1,5,main] put 2
Thread[pool-1-thread-1,5,main] put 0
Thread[netty-1-1,5,main] accessor.value() 1
Thread[netty-1-2,5,main] accessor.value() 3
Thread[pool-1-thread-1,5,main] accessor.value() 0
Thread[pool-2-thread-1,5,main] accessor.value() 2
Thread[netty-1-1,5,main] remove 1
Thread[netty-1-2,5,main] remove 3
0
2

We can see remove logs are only printed by threads whose prefix is netty-.

BewareMyPower avatar May 11 '22 08:05 BewareMyPower

Whether should be add a check to judge whether the thread managed by netty

coderzc avatar May 16 '22 05:05 coderzc

How to check if the thread managed by Netty? Even And it doesn't solve the problem. Because we need to collect the LocalData into the map and remove it when a thread exits. For non-Netty threads, we cannot remove it when a thread exits in a direct way.

BewareMyPower avatar May 17 '22 03:05 BewareMyPower