flink-clickhouse-sink
flink-clickhouse-sink copied to clipboard
WriterTask thread will be closed one by one.
Thanks for your work!
I encounter a problem!After the program runs for a while, I found that ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask
will be closed one by one;
I checked the log:
There are a large number of Task id = 10 is finished
2022-11-04 10:42:28,128 INFO ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 9 is finished
2022-11-04 10:42:32,783 INFO ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Ready to load data to monitor.qunhe_log, size = 100000
2022-11-04 10:42:32,783 INFO ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Ready to load data to monitor.qunhe_log, size = 100000
2022-11-04 10:42:34,532 INFO ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 1 is finished
2022-11-04 10:42:35,861 INFO ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Ready to load data to monitor.qunhe_log, size = 100000
2022-11-04 10:42:39,153 INFO ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 12 is finished
2022-11-04 10:42:40,263 INFO ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 18 is finished
2022-11-04 10:42:46,699 INFO ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 3 is finished
2022-11-04 10:42:47,717 INFO ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 10 is finished
2022-11-04 10:42:53,492 INFO ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 6 is finished
2022-11-04 10:42:55,086 INFO ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 8 is finished
2022-11-04 10:42:59,064 INFO ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 2 is finished
2022-11-04 10:43:00,173 INFO ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 19 is finished
Then I looked up the clickhouse sink source, where OOM happened
@Override
public void run() {
try {
isWorking = true;
logger.info("Start writer task, id = {}", id);
while (isWorking || queue.size() > 0) {
ClickHouseRequestBlank blank = queue.poll(300, TimeUnit.MILLISECONDS);
if (blank != null) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
futures.add(future);
send(blank, future);
}
}
// Can't catch Throwable for OOM
} catch (Exception e) {
logger.error("Error while inserting data", e);
throw new RuntimeException(e);
} finally {
logger.info("Task id = {} is finished", id);
}
}
I added catch Throwable to this piece of code,arise java.lang.OutOfMemoryError: Direct buffer memory
java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: Direct buffer memory
at java.util.concurrent.CompletableFuture.reportGet(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?]
at org.asynchttpclient.netty.NettyResponseFuture.get(NettyResponseFuture.java:201) ~[job.jar:?]
at cksink.applied.ClickHouseWriter$WriterTask.lambda$responseCallback$0(ClickHouseWriter.java:219) ~[job.jar:?]
at org.asynchttpclient.netty.NettyResponseFuture.lambda$addListener$0(NettyResponseFuture.java:294) ~[job.jar:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) [?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) [?:?]
at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Unknown Source) ~[?:?]
at java.nio.DirectByteBuffer.<init>(Unknown Source) ~[?:?]
at java.nio.ByteBuffer.allocateDirect(Unknown Source) ~[?:?]
at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:758) ~[job.jar:?]
at io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:748) ~[job.jar:?]
at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:260) ~[job.jar:?]
at io.netty.buffer.PoolArena.allocate(PoolArena.java:232) ~[job.jar:?]
at io.netty.buffer.PoolArena.allocate(PoolArena.java:147) ~[job.jar:?]
Then I turned up flink's out-of-heap memory taskmanager.memory.task.off-heap.size
from 512MB to 1GB,have no effect.
I am confused now, can you give me some advice? thanks.