flink-clickhouse-sink icon indicating copy to clipboard operation
flink-clickhouse-sink copied to clipboard

WriterTask thread will be closed one by one.

Open ChaoHsupin opened this issue 2 years ago • 6 comments

Thanks for your work! I encounter a problem!After the program runs for a while, I found that ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask will be closed one by one;

I checked the log: There are a large number of Task id = 10 is finished

2022-11-04 10:42:28,128 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 9 is finished
2022-11-04 10:42:32,783 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Ready to load data to monitor.qunhe_log, size = 100000
2022-11-04 10:42:32,783 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Ready to load data to monitor.qunhe_log, size = 100000
2022-11-04 10:42:34,532 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 1 is finished
2022-11-04 10:42:35,861 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Ready to load data to monitor.qunhe_log, size = 100000
2022-11-04 10:42:39,153 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 12 is finished
2022-11-04 10:42:40,263 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 18 is finished
2022-11-04 10:42:46,699 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 3 is finished
2022-11-04 10:42:47,717 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 10 is finished
2022-11-04 10:42:53,492 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 6 is finished
2022-11-04 10:42:55,086 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 8 is finished
2022-11-04 10:42:59,064 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 2 is finished
2022-11-04 10:43:00,173 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 19 is finished

Then I looked up the clickhouse sink source, where OOM happened

     @Override
        public void run() {
            try {
                isWorking = true;

                logger.info("Start writer task, id = {}", id);
                while (isWorking || queue.size() > 0) {
                    ClickHouseRequestBlank blank = queue.poll(300, TimeUnit.MILLISECONDS);
                    if (blank != null) {
                        CompletableFuture<Boolean> future = new CompletableFuture<>();
                        futures.add(future);
                        send(blank, future);
                    }
                }
// Can't catch Throwable for OOM
            } catch (Exception e) {
                logger.error("Error while inserting data", e);
                throw new RuntimeException(e);
            } finally {
                logger.info("Task id = {} is finished", id);
            }
        }

I added catch Throwable to this piece of code,arise java.lang.OutOfMemoryError: Direct buffer memory

java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: Direct buffer memory
	at java.util.concurrent.CompletableFuture.reportGet(Unknown Source) ~[?:?]
	at java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?]
	at org.asynchttpclient.netty.NettyResponseFuture.get(NettyResponseFuture.java:201) ~[job.jar:?]
	at cksink.applied.ClickHouseWriter$WriterTask.lambda$responseCallback$0(ClickHouseWriter.java:219) ~[job.jar:?]
	at org.asynchttpclient.netty.NettyResponseFuture.lambda$addListener$0(NettyResponseFuture.java:294) ~[job.jar:?]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) [?:?]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) [?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
	at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
	at java.nio.Bits.reserveMemory(Unknown Source) ~[?:?]
	at java.nio.DirectByteBuffer.<init>(Unknown Source) ~[?:?]
	at java.nio.ByteBuffer.allocateDirect(Unknown Source) ~[?:?]
	at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:758) ~[job.jar:?]
	at io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:748) ~[job.jar:?]
	at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:260) ~[job.jar:?]
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:232) ~[job.jar:?]
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:147) ~[job.jar:?]

Then I turned up flink's out-of-heap memory taskmanager.memory.task.off-heap.size from 512MB to 1GB,have no effect.

I am confused now, can you give me some advice? thanks.

ChaoHsupin avatar Nov 04 '22 13:11 ChaoHsupin