Running the same batch task multiple times using the Flink Session pattern causes the linux out of memory
I have A batch task that inserts 100 million data from table A into table B after the primary key hash. I use Flink mode to execute this task. After several times of executing this task, I find that the Flink TaskManager process occupies more and more memory. Finally, the Flink TaskManager process is killed when the system runs out of memory.
Manually performing GC memory reclamation after the task execution is completed has no effect, and viewing memory through top does not decrease. unless the Flink TaskManager is turned off.
Logically, when a task is finished, it should release the resources it occupies, so that subsequent tasks can continue to run, but it does not.
The growth of memory occurs in the icebergStreamWriter stage, but I look at the code and do not find a place to use off-heap memory, so it is more confusing, can anyone help me?

Memory usage is 25% before running the job, and 72% memory is reached after three times

The Configuration
The System kill taskmanager process
Install jemalloc on the every node of flink cluster can solve this problem
1、install jemalloc
2、Add the configuration below in /etc/profile export LD_PRELOAD=/usr/local/lib/libjemalloc.so #export MALLOC_CONF="prof:true,prof_prefix:/home/bigdata/jeprof.out,lg_prof_interval:30,lg_prof_sample:20"
3、The source /etc/profile
4、And Then start flink cluster
5、execute the batch jobs
I install jemalloc on two of five nodes, then here are the result of test, after seven times exec batch job the memory is increse to 50%+ who use glibc-malloc

Why change the glibc-malloc to jemalloc
The reason i think maybe the glibc-malloc casued plenty memory fragmentation。
@rdblue @openinx
can you help me,i use flink to writer hive table don't have this problem
I think the core reason is: we have set the primary keys in iceberg table for flink DDL, so the flink write job will think that we are trying to write all those results into an iceberg format v2 table. see the following code piece:
https://github.com/apache/iceberg/blob/master/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java#L62-L72
Do you want to write all those records into an iceberg format v2 table to deduplicate those rows who has the same primary key ? If not , then I will suggest to remove the primary key definition in the flink sink table DDL. If sure, then I think we may need to port this PR in your branch to fix the OutOfMemory issue.
@openinx First thanks a lot for your reply, It was I who didn't describe the scene clearly
1、This is my source table:
create table hive_catalog.iceberg_db.sourceTable (
step_id string,
param_id string,
wafer_id string,
chip_id string,
product_id1 string,
product_id2 string,
product_id3 string,
product_id4 string,
product_id5 string,
num_item1 double,
num_item2 double,
num_item3 double,
num_item4 double,
num_item5 double,
num_item6 double,
start_date date,
PRIMARY KEY (chip_id, param_id, step_id, wafer_id) NOT ENFORCED)
PARTITIONED BY (step_id, start_date)
WITH ('format-version'='2');
2、This is Temp Table:
create table hive_catalog.iceberg_db.temp_data (
step_id string,
param_id string,
wafer_id string,
chip_id string,
product_id1 string,
product_id2 string,
product_id3 string,
product_id4 string,
product_id5 string,
num_item1 double,
num_item2 double,
num_item3 double,
num_item4 double,
num_item5 double,
num_item6 double,
start_date date,
part_id int)
PARTITIONED BY (part_id)
WITH ('format-version'='2', 'write.distribution-mode'='hash');
This is the sql which my batch job use.
3、insert into temp_data select step_id, param_id, wafer_id, chip_id, product_id1, product_id2, product_id3, product_id4, product_id5, num_item1, num_item2, num_item3, num_item4, num_item5, num_item6, start_date, (HASH_CODE(step_id || param_id || step_id || wafer_id)) % 10 as part_id from source;
After i run this job seven or eight tims the memory of linux will be incresed to 80%,And the memory which taskmanger used is 60%.
But if i use jemalloc to run several times of job, the memory is noraml , which is same as the flink cluster just started
i find an article an solve this problem
https://github.com/prestodb/presto/issues/8993
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.
This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'