[FLINK-25009][CLI] Output slotSharingGroup as part of JsonGraph
What is the purpose of the change
https://issues.apache.org/jira/browse/FLINK-25009
flink info currently doesn't output the slotSharingGroup information for each operator. This makes it impossible to derive total task slots required from the json graph when slo sharing group is explicitly set for the job.
This PR add the slotSharingGroup information to each operator node in the output JSON graph.
Brief change log
- Add slot sharing group info when generating the JSON graph
Verifying this change
- Added test to verify that the generated JSON graph has
slot_sharing_groupfor each stream node This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
na
Documentation
- Does this pull request introduce a new feature? I think it's an improvement rather than a feature
- If yes, how is the feature documented? na
CI report:
- 8f4a1ef7b14385a73b39d768dd811675cdafb7ba Azure: FAILURE
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review.
Automated Checks
Last check on commit 5ee33b6fada620c83fad2b0ab09d2354b3bca583 (Mon Nov 22 21:16:40 UTC 2021)
Warnings:
- No documentation files were touched! Remember to keep the Flink docs up to date!
- This pull request references an unassigned Jira ticket. According to the code contribution guide, tickets need to be assigned before starting with the implementation work.
Mention the bot in a comment to re-run the automated checks.
Review Progress
- ❓ 1. The [description] looks good.
- ❓ 2. There is [consensus] that the contribution should go into to Flink.
- ❓ 3. Needs [attention] from.
- ❓ 4. The change fits into the overall [architecture].
- ❓ 5. Overall code [quality] is good.
Please see the Pull Request Review Guide for a full explanation of the review process.Bot commands
The @flinkbot bot supports the following commands:
@flinkbot approve descriptionto approve one or more aspects (aspects:description,consensus,architectureandquality)@flinkbot approve allto approve all aspects@flinkbot approve-until architectureto approve everything untilarchitecture@flinkbot attention @username1 [@username2 ..]to require somebody's attention@flinkbot disapprove architectureto remove an approval you gave earlier
@rmetzger Hi sorry for pinging. May you share some info on how to get a reviewer to look at the PR? thank you !
@xinbinhuang , this pull request could be review by @KarmaGYZ .
@xinbinhuang , this pull request could be review by @KarmaGYZ .
Thank you! @KarmaGYZ can you help take a look?
@xinbinhuang There are failure tests that seem to be related to this change. Would you like to take a look first?
@xinbinhuang If you need any help in fixing those tests, feel free to ask :).
@xinbinhuang If you need any help in fixing those tests, feel free to ask :).
Hey, have been a bit busy for the last two days. Im gonna look into it tmr. I'll reach out if I get stuck. Thank you!
@flinkbot run azure
@flinkbot run azure
@flinkbot run azure
@xinbinhuang Just a kindly reminder, please rebase your PR to the latest master instead of merging it.
@xinbinhuang Just a kindly reminder, please rebase your PR to the latest master instead of merging it.
thanks for the reminder, I'll try to rebase the PR when it's ready. I had some hard time running tests locally within the IDE - it keeps giving me error on not finding tests. Would you suggest any approaches to verify my changes easily?
Hmm. I'm afraid I can not help a lot. Maybe you can follow this guide to reimport your project.
BTW, the CI is currently unstable, you can retrigger it later.
@flinkbot run azure
@flinkbot run azure
@flinkbot run azure
looks like the CI is stuck? (Build Flink stage last for > 4 hrs e2c_ci_test2.)
@KarmaGYZ sorry for getting back to this so late. Finally got some time to sort out my laptop's build setup. Can you take a look at the PR again? thx
@flinkbot run azure
@KarmaGYZ @SteNicholas CI green. ✅ PTAL if you have time
@xinbinhuang Thanks for the update. Could you explain why you need to derive the total task slots required from the JSON graph?
@xinbinhuang Thanks for the update. Could you explain why you need to derive the total task slots required from the JSON graph?
Our team wants to derive the total slots required for the job before redeploying. This help avoid the situation where the old job is stopped and the new job doesn't have enough resources to start. With this change we can make sure the cluster has enough resources for the new job first before stopping the old one. In general, I think slots sharing groups is an useful information to better understand the job for debugging and performance tuning. WDYT?
@xinbinhuang Thanks for the update. Could you explain why you need to derive the total task slots required from the JSON graph?
Our team wants to derive the total slots required for the job before redeploying. This help avoid the situation where the old job is stopped and the new job doesn't have enough resources to start. With this change we can make sure the cluster has enough resources for the new job first before stopping the old one. In general, I think slots sharing groups is an useful information to better understand the job for debugging and performance tuning. WDYT?
I'm afraid that even with this information, you cannot calculate the exact number of slots your job requires because the slot sharing group will not restrict the scheduling of operators[1]. It is just a hint for the scheduling and might not be obeyed in the future. However, I agree that this is indeed helpful information. @godfreyhe WDYT about the compatibility issue introduced by this PR?
[1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/finegrained_resource/#notice
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/finegrained_resource/#notice
ah, wasn't realized this! thanks for the reference! I'll give a through read over the weekend.
From the doc section you link, it says that slot sharing group are not enforced on the same slot. However, our use case is slightly different: we want to calculate the max slots required, so the cluster has enough resources before deploying. IIUC, different slot sharing group can't schedule on the same slot, so adding slot sharing group has the implication on increasing parallelism/slots required i.e. max slots required = sum(max parallelism of each slot sharing group). We don't concern about if operators in the same sharing group are deployed together or not.
Is my understanding correct?
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/finegrained_resource/#notice
ah, wasn't realized this! thanks for the reference! I'll give a through read over the weekend.
From the doc section you link, it says that slot sharing group are not enforced on the same slot. However, our use case is slightly different: we want to calculate the max slots required, so the cluster has enough resources before deploying. IIUC, different slot sharing group can't schedule on the same slot, so adding slot sharing group has the implication on increasing parallelism/slots required i.e.
max slots required = sum(max parallelism of each slot sharing group). We don't concern about if operators in the same sharing group are deployed together or not.Is my understanding correct?
I'm afraid not. For example, you put two operators A and B(with parallelism 1) into a slot sharing group. Then, we can deploy them into two physical slots.
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/finegrained_resource/#notice
ah, wasn't realized this! thanks for the reference! I'll give a through read over the weekend. From the doc section you link, it says that slot sharing group are not enforced on the same slot. However, our use case is slightly different: we want to calculate the max slots required, so the cluster has enough resources before deploying. IIUC, different slot sharing group can't schedule on the same slot, so adding slot sharing group has the implication on increasing parallelism/slots required i.e.
max slots required = sum(max parallelism of each slot sharing group). We don't concern about if operators in the same sharing group are deployed together or not. Is my understanding correct?I'm afraid not. For example, you put two operators
AandB(with parallelism 1) into a slot sharing group. Then, we can deploy them into two physical slots.
Ahh, good to know! One follow up question: does this scenario happens often, or it only happens if fine-grained resource management is configured? From our experience, the calculation formula is correct so far but we only use the .slotSharingGroup API but not the fine-grained resource management feature.
For example, you put two operators A and B(with parallelism 1) into a slot sharing group. Then, we can deploy them into two physical slots.
Just for my knowledge, in what situation can this happen?
It is just a hint for the scheduling and might not be obeyed in the future. However, I agree that this is indeed helpful information. @godfreyhe WDYT about the compatibility issue introduced by this PR?
@godfreyhe would appreciate if you can help review compatibility issue.
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/finegrained_resource/#notice
ah, wasn't realized this! thanks for the reference! I'll give a through read over the weekend. From the doc section you link, it says that slot sharing group are not enforced on the same slot. However, our use case is slightly different: we want to calculate the max slots required, so the cluster has enough resources before deploying. IIUC, different slot sharing group can't schedule on the same slot, so adding slot sharing group has the implication on increasing parallelism/slots required i.e.
max slots required = sum(max parallelism of each slot sharing group). We don't concern about if operators in the same sharing group are deployed together or not. Is my understanding correct?I'm afraid not. For example, you put two operators
AandB(with parallelism 1) into a slot sharing group. Then, we can deploy them into two physical slots.Ahh, good to know! One follow up question: does this scenario happens often, or it only happens if fine-grained resource management is configured? From our experience, the calculation formula is correct so far but we only use the
.slotSharingGroupAPI but not the fine-grained resource management feature.For example, you put two operators A and B(with parallelism 1) into a slot sharing group. Then, we can deploy them into two physical slots.
Just for my knowledge, in what situation can this happen?
This scenario will not occur in current Flink. But it can occur in the future.
Breaking the JSON plan is something that can't be done, because it's required for the SQL upgrade story
@MartijnVisser there might be misunderstanding here. This "JSON plan" is not the SQL "CompiledPlan" but a JSON representation of the StreamGraph. Nevertheless, this is public API so we need to be careful that we don't break down stream systems. I'm pretty sure that platform teams might have build UI or other tooling around this JSON. Hopefully, their logic is flexible enough to accept an additional field. But nevertheless we should not change or remove fields without a good reason.