flink-on-k8s-operator icon indicating copy to clipboard operation
flink-on-k8s-operator copied to clipboard

When flink version >= 1.10 , the "taskmanager.heap.size" is not work.

Open Mrart opened this issue 4 years ago • 19 comments

We have calcute flink memory use "tmMemoryOffHeapRatio" and "tmMemLimitByte' in operator, But it is not work for flink 1.10. flink 1.10 use "taskmanager.memory.process.size" to limit the flink memory.

If we only config "taskmanager.memory.heap.size" in flink 1.10, It will trigger tm container OOM

Mrart avatar Jul 24 '20 10:07 Mrart

I think we can add "useProcessSize" in taskmanager cr, if flink 1.10, use processsize "true", use resources.limit as "taskmanager.memory.process.size", else use taskmanager.heap.size.

Mrart avatar Jul 24 '20 10:07 Mrart

Thanks for the bug report, but according to the doc, Flink >=1.10 supports/requires both jobmanager.heap.size and taskmanager.memory.process.size:

jobmanager.heap.size: Sets the size of the Flink Master (JobManager / ResourceManager / Dispatcher) JVM heap.

taskmanager.memory.process.size: Total size of the TaskManager process, including everything. Flink will subtract some memory for the JVM’s own memory requirements (metaspace and others), and divide and configure the rest automatically between its components (network, managed memory, JVM Heap, etc.).

I think the fix should be that if taskManagerSpec.resources.limits.memory is provided by the user, set taskmanager.memory.process.size to taskManagerSpec.resources.limits.memory while keeping the current logic for taskmanager.heap.size. It should be okay to set both properties regardless of Flink version. In Flink <=1.9, the taskmanager.memory.process.size will be ignored, but there is no harm.

In addition to the fix, the user can always manually set the property with flinkProperties.

Does this sound good to you?

functicons avatar Jul 25 '20 20:07 functicons

@functicons , Under my tests,When use 'taskmanager.memory.process.size' in flinkProperties, the 'taskmanager.heap.size' will set as "taskmanager.memory.flink.size", "taskmanager.memory.process.size" - "taskmanager.memory.flink.size" = "metaspace"+ "overhead" in doc ,and It will case something wrong, So we need delete "taskmanager.heap.size" in Flink 1.10.

Then I describe how I found this problems, When I run task use this operator, I found "taskmanager.heap.size" is set as "taskmanager.memory.flink.size" log, and my container of the task is always OOM, I have set metaspace is 256M, calculate the overhead is only "45M", when set tm.limit 6192M, So I guess this is a bug. As I have set process.size = tm.memory, the taskmanager can not be start, as log sed that the overhead is only 45M, tm can not start .

Mrart avatar Jul 27 '20 02:07 Mrart

Actually I couldn't find taskmanager.heap.size in the Flink 1.10 doc, and there are many new taskmanager.memory.* properties. So, I don't understand the reason

I found "taskmanager.heap.size" is set as "taskmanager.memory.flink.size" log, and my container of the task is always OOM.

Could you show me the log?

functicons avatar Jul 27 '20 05:07 functicons

  • 'taskmanager.memory.flink.size' is not specified, use the configured deprecated task manager heap value (5.625gb (6039797760 bytes)) for it.

Mrart avatar Jul 27 '20 05:07 Mrart

Gotcha. What if you specify taskmanager.memory.flink.size manually in flinkProperties? That should solve the problem I think.

I think instead of adding a field useProcessSize, we might want to add a field flinkVersion, which is more general and future-proof.

functicons avatar Jul 27 '20 14:07 functicons

I am also looking into this issue. Versions 1.10 and 1.11 also look different. jobmanager.heap.size is deprecated in 1.11.

note: https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_migration.html

It seems to have to be handled differently depending on versions 1.10 and 1.11.

elanv avatar Jul 28 '20 16:07 elanv

How about configuring {jobmanager,taskmanager}.heap.size only if memoryOffHeapRatio or memoryOffHeapMin is provided and {jobmanager,taskmanager}.memory.process.size if not? Right now, if memoryOffHeapRatio or memoryOffHeapMin is not provided, the default value is used, so in this case, there is a disadvantage that behavior changes, but it seems that memoryOffHeapRatio/memoryOffHeapMin can be deprecated naturally.

If version <1.10, {jobmanager,taskmanager}.memory.process.size will be ignored even if it is set. If version >= 1.10, you can avoid using memoryOffHeapRatio/memoryOffHeapMin.

elanv avatar Jul 29 '20 02:07 elanv

@elanv, I would prefer adding an optional property flinkVersion (with default 1.9) and set other properties accordingly, because in the future there might be more of these property changes, inferring property values from other properties is error-prone and hard for users to remember.

functicons avatar Jul 30 '20 01:07 functicons

@functicons Sounds good. Do you have any plans to work on this issue? If not, I will to try this because it's a feature we need now.

elanv avatar Jul 30 '20 02:07 elanv

@functicons Can we support flinkVersion 1.10, then only support flinkVersion 1.9. And other version set properties accordingly. If you agree, I can working on this issue.

Mrart avatar Jul 30 '20 02:07 Mrart

@Mrart This is an issue my team is interested in. But it would be nice also if you could resolve this issue.

elanv avatar Jul 30 '20 03:07 elanv

@elanv I think this issue is not difficult after we disscuss clearly.Do you agree We add flinkVersion ,and support only < flink 1.9 or 1.10 , and other version user can use set properties accordingly?

Mrart avatar Jul 30 '20 03:07 Mrart

@elanv I think this issue is not difficult after we disscuss clearly.Do you agree We add flinkVersion ,and support only < flink 1.9 or 1.10 , and other version user can use set properties accordingly?

I argree to memoryOffHeapRatio/memoryOffHeapMin should be applied to only Flink version <= 1.9 or 1.10.

elanv avatar Jul 30 '20 03:07 elanv

I currently don't have bandwidth to work on the fix. You guys can decide who will be working on it. Thanks!

functicons avatar Jul 30 '20 04:07 functicons

Ok I will commit this feature tommorrow.

Mrart avatar Jul 30 '20 05:07 Mrart

@Mrart any update on this issue? If you are busy with some other things, can we instead let @elanv to take it over?

functicons avatar Aug 11 '20 04:08 functicons

Share relevant information for those looking for a workround on this issue.

The reason flink fails to start in 1.10 and above is due to tighter memory configuration constraint. taskmanager.memory.process.size or taskmanager.memory.flink.size limits the total flink memory usage, but if the other memory configurations are not consistent within that limit, the Flink process fails to start. There is a workaround like below to configure memory in Flink version 1.10 or above. Unless you changed default memory configurations, the memory limit should be at least approximately 1gb. You can also lower the limit by tuning additional memory configurations like taskmanager.memory.managed.fraction or taskmanager.memory.jvm-metaspace.size but it may be not recommended for production.

spec:
  flinkProperties:
    jobmanager.heap.size: ""                # set empty value (only for Flink version 1.11 or above)
    jobmanager.memory.process.size: 600mb   # job manager memory limit  (only for Flink version 1.11 or above)
    taskmanager.heap.size: ""               # set empty value
    taskmanager.memory.process.size: 1gb    # task manager memory limit

note: Flink memory calculation sheet from Flink community https://docs.google.com/spreadsheets/d/1mJaMkMPfDJJ-w6nMXALYmTc4XxiV30P5U7DzgwLkSoE/edit#gid=0

elanv avatar Aug 14 '20 01:08 elanv

@functicons I have push to my own branch, not do pr. because I don't have enougph test.

Mrart avatar Aug 17 '20 07:08 Mrart