flink-on-k8s-operator
flink-on-k8s-operator copied to clipboard
When flink version >= 1.10 , the "taskmanager.heap.size" is not work.
We have calcute flink memory use "tmMemoryOffHeapRatio" and "tmMemLimitByte' in operator, But it is not work for flink 1.10. flink 1.10 use "taskmanager.memory.process.size" to limit the flink memory.
If we only config "taskmanager.memory.heap.size" in flink 1.10, It will trigger tm container OOM
I think we can add "useProcessSize" in taskmanager cr, if flink 1.10, use processsize "true", use resources.limit as "taskmanager.memory.process.size", else use taskmanager.heap.size.
Thanks for the bug report, but according to the doc, Flink >=1.10 supports/requires both jobmanager.heap.size
and taskmanager.memory.process.size
:
jobmanager.heap.size: Sets the size of the Flink Master (JobManager / ResourceManager / Dispatcher) JVM heap.
taskmanager.memory.process.size: Total size of the TaskManager process, including everything. Flink will subtract some memory for the JVM’s own memory requirements (metaspace and others), and divide and configure the rest automatically between its components (network, managed memory, JVM Heap, etc.).
I think the fix should be that if taskManagerSpec.resources.limits.memory
is provided by the user, set taskmanager.memory.process.size
to taskManagerSpec.resources.limits.memory
while keeping the current logic for taskmanager.heap.size
. It should be okay to set both properties regardless of Flink version. In Flink <=1.9, the taskmanager.memory.process.size
will be ignored, but there is no harm.
In addition to the fix, the user can always manually set the property with flinkProperties
.
Does this sound good to you?
@functicons , Under my tests,When use 'taskmanager.memory.process.size' in flinkProperties, the 'taskmanager.heap.size' will set as "taskmanager.memory.flink.size", "taskmanager.memory.process.size" - "taskmanager.memory.flink.size" = "metaspace"+ "overhead" in doc ,and It will case something wrong, So we need delete "taskmanager.heap.size" in Flink 1.10.
Then I describe how I found this problems, When I run task use this operator, I found "taskmanager.heap.size" is set as "taskmanager.memory.flink.size" log, and my container of the task is always OOM, I have set metaspace is 256M, calculate the overhead is only "45M", when set tm.limit 6192M, So I guess this is a bug. As I have set process.size = tm.memory, the taskmanager can not be start, as log sed that the overhead is only 45M, tm can not start .
Actually I couldn't find taskmanager.heap.size
in the Flink 1.10 doc, and there are many new taskmanager.memory.*
properties. So, I don't understand the reason
I found "taskmanager.heap.size" is set as "taskmanager.memory.flink.size" log, and my container of the task is always OOM.
Could you show me the log?
- 'taskmanager.memory.flink.size' is not specified, use the configured deprecated task manager heap value (5.625gb (6039797760 bytes)) for it.
Gotcha. What if you specify taskmanager.memory.flink.size
manually in flinkProperties
? That should solve the problem I think.
I think instead of adding a field useProcessSize
, we might want to add a field flinkVersion
, which is more general and future-proof.
I am also looking into this issue.
Versions 1.10 and 1.11 also look different.
jobmanager.heap.size
is deprecated in 1.11.
note: https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_migration.html
It seems to have to be handled differently depending on versions 1.10 and 1.11.
How about configuring {jobmanager,taskmanager}.heap.size
only if memoryOffHeapRatio
or memoryOffHeapMin
is provided and {jobmanager,taskmanager}.memory.process.size
if not?
Right now, if memoryOffHeapRatio
or memoryOffHeapMin
is not provided, the default value is used, so in this case, there is a disadvantage that behavior changes, but it seems that memoryOffHeapRatio
/memoryOffHeapMin
can be deprecated naturally.
If version <1.10, {jobmanager,taskmanager}.memory.process.size
will be ignored even if it is set.
If version >= 1.10, you can avoid using memoryOffHeapRatio/memoryOffHeapMin
.
@elanv, I would prefer adding an optional property flinkVersion
(with default 1.9) and set other properties accordingly, because in the future there might be more of these property changes, inferring property values from other properties is error-prone and hard for users to remember.
@functicons Sounds good. Do you have any plans to work on this issue? If not, I will to try this because it's a feature we need now.
@functicons Can we support flinkVersion 1.10, then only support flinkVersion 1.9. And other version set properties accordingly. If you agree, I can working on this issue.
@Mrart This is an issue my team is interested in. But it would be nice also if you could resolve this issue.
@elanv I think this issue is not difficult after we disscuss clearly.Do you agree We add flinkVersion ,and support only < flink 1.9 or 1.10 , and other version user can use set properties accordingly?
@elanv I think this issue is not difficult after we disscuss clearly.Do you agree We add flinkVersion ,and support only < flink 1.9 or 1.10 , and other version user can use set properties accordingly?
I argree to memoryOffHeapRatio
/memoryOffHeapMin
should be applied to only Flink version <= 1.9 or 1.10.
I currently don't have bandwidth to work on the fix. You guys can decide who will be working on it. Thanks!
Ok I will commit this feature tommorrow.
@Mrart any update on this issue? If you are busy with some other things, can we instead let @elanv to take it over?
Share relevant information for those looking for a workround on this issue.
The reason flink fails to start in 1.10 and above is due to tighter memory configuration constraint. taskmanager.memory.process.size
or taskmanager.memory.flink.size
limits the total flink memory usage, but if the other memory configurations are not consistent within that limit, the Flink process fails to start. There is a workaround like below to configure memory in Flink version 1.10 or above. Unless you changed default memory configurations, the memory limit should be at least approximately 1gb. You can also lower the limit by tuning additional memory configurations like taskmanager.memory.managed.fraction
or taskmanager.memory.jvm-metaspace.size
but it may be not recommended for production.
spec:
flinkProperties:
jobmanager.heap.size: "" # set empty value (only for Flink version 1.11 or above)
jobmanager.memory.process.size: 600mb # job manager memory limit (only for Flink version 1.11 or above)
taskmanager.heap.size: "" # set empty value
taskmanager.memory.process.size: 1gb # task manager memory limit
note: Flink memory calculation sheet from Flink community https://docs.google.com/spreadsheets/d/1mJaMkMPfDJJ-w6nMXALYmTc4XxiV30P5U7DzgwLkSoE/edit#gid=0
@functicons I have push to my own branch, not do pr. because I don't have enougph test.