flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-25009][CLI] Output slotSharingGroup as part of JsonGraph

Open xinbinhuang opened this issue 4 years ago • 36 comments

What is the purpose of the change

https://issues.apache.org/jira/browse/FLINK-25009

flink info currently doesn't output the slotSharingGroup information for each operator. This makes it impossible to derive total task slots required from the json graph when slo sharing group is explicitly set for the job.

This PR add the slotSharingGroup information to each operator node in the output JSON graph.

Brief change log

  • Add slot sharing group info when generating the JSON graph

Verifying this change

  • Added test to verify that the generated JSON graph has slot_sharing_group for each stream node This change added tests and can be verified as follows:

Does this pull request potentially affect one of the following parts:

na

Documentation

  • Does this pull request introduce a new feature? I think it's an improvement rather than a feature
  • If yes, how is the feature documented? na

xinbinhuang avatar Nov 22 '21 21:11 xinbinhuang

CI report:

  • 8f4a1ef7b14385a73b39d768dd811675cdafb7ba Azure: FAILURE
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Nov 22 '21 21:11 flinkbot

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 5ee33b6fada620c83fad2b0ab09d2354b3bca583 (Mon Nov 22 21:16:40 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!
  • This pull request references an unassigned Jira ticket. According to the code contribution guide, tickets need to be assigned before starting with the implementation work.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

flinkbot avatar Nov 22 '21 21:11 flinkbot

@rmetzger Hi sorry for pinging. May you share some info on how to get a reviewer to look at the PR? thank you !

xinbinhuang avatar Nov 30 '21 22:11 xinbinhuang

@xinbinhuang , this pull request could be review by @KarmaGYZ .

SteNicholas avatar Dec 03 '21 12:12 SteNicholas

@xinbinhuang , this pull request could be review by @KarmaGYZ .

Thank you! @KarmaGYZ can you help take a look?

xinbinhuang avatar Dec 04 '21 04:12 xinbinhuang

@xinbinhuang There are failure tests that seem to be related to this change. Would you like to take a look first?

KarmaGYZ avatar Dec 06 '21 02:12 KarmaGYZ

@xinbinhuang If you need any help in fixing those tests, feel free to ask :).

KarmaGYZ avatar Dec 08 '21 08:12 KarmaGYZ

@xinbinhuang If you need any help in fixing those tests, feel free to ask :).

Hey, have been a bit busy for the last two days. Im gonna look into it tmr. I'll reach out if I get stuck. Thank you!

xinbinhuang avatar Dec 08 '21 09:12 xinbinhuang

@flinkbot run azure

xinbinhuang avatar Dec 13 '21 07:12 xinbinhuang

@flinkbot run azure

xinbinhuang avatar Dec 13 '21 17:12 xinbinhuang

@flinkbot run azure

xinbinhuang avatar Dec 24 '21 21:12 xinbinhuang

@xinbinhuang Just a kindly reminder, please rebase your PR to the latest master instead of merging it.

KarmaGYZ avatar Dec 27 '21 02:12 KarmaGYZ

@xinbinhuang Just a kindly reminder, please rebase your PR to the latest master instead of merging it.

thanks for the reminder, I'll try to rebase the PR when it's ready. I had some hard time running tests locally within the IDE - it keeps giving me error on not finding tests. Would you suggest any approaches to verify my changes easily?

xinbinhuang avatar Dec 27 '21 21:12 xinbinhuang

Hmm. I'm afraid I can not help a lot. Maybe you can follow this guide to reimport your project.

BTW, the CI is currently unstable, you can retrigger it later.

KarmaGYZ avatar Dec 28 '21 02:12 KarmaGYZ

@flinkbot run azure

xinbinhuang avatar May 16 '22 00:05 xinbinhuang

@flinkbot run azure

xinbinhuang avatar May 16 '22 01:05 xinbinhuang

@flinkbot run azure

looks like the CI is stuck? (Build Flink stage last for > 4 hrs e2c_ci_test2.)

xinbinhuang avatar May 17 '22 22:05 xinbinhuang

@KarmaGYZ sorry for getting back to this so late. Finally got some time to sort out my laptop's build setup. Can you take a look at the PR again? thx

xinbinhuang avatar May 17 '22 22:05 xinbinhuang

@flinkbot run azure

xinbinhuang avatar May 18 '22 03:05 xinbinhuang

@KarmaGYZ @SteNicholas CI green. ✅ PTAL if you have time

xinbinhuang avatar May 18 '22 16:05 xinbinhuang

@xinbinhuang Thanks for the update. Could you explain why you need to derive the total task slots required from the JSON graph?

KarmaGYZ avatar May 19 '22 06:05 KarmaGYZ

@xinbinhuang Thanks for the update. Could you explain why you need to derive the total task slots required from the JSON graph?

Our team wants to derive the total slots required for the job before redeploying. This help avoid the situation where the old job is stopped and the new job doesn't have enough resources to start. With this change we can make sure the cluster has enough resources for the new job first before stopping the old one. In general, I think slots sharing groups is an useful information to better understand the job for debugging and performance tuning. WDYT?

xinbinhuang avatar May 19 '22 15:05 xinbinhuang

@xinbinhuang Thanks for the update. Could you explain why you need to derive the total task slots required from the JSON graph?

Our team wants to derive the total slots required for the job before redeploying. This help avoid the situation where the old job is stopped and the new job doesn't have enough resources to start. With this change we can make sure the cluster has enough resources for the new job first before stopping the old one. In general, I think slots sharing groups is an useful information to better understand the job for debugging and performance tuning. WDYT?

I'm afraid that even with this information, you cannot calculate the exact number of slots your job requires because the slot sharing group will not restrict the scheduling of operators[1]. It is just a hint for the scheduling and might not be obeyed in the future. However, I agree that this is indeed helpful information. @godfreyhe WDYT about the compatibility issue introduced by this PR?

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/finegrained_resource/#notice

KarmaGYZ avatar May 20 '22 06:05 KarmaGYZ

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/finegrained_resource/#notice

ah, wasn't realized this! thanks for the reference! I'll give a through read over the weekend.

From the doc section you link, it says that slot sharing group are not enforced on the same slot. However, our use case is slightly different: we want to calculate the max slots required, so the cluster has enough resources before deploying. IIUC, different slot sharing group can't schedule on the same slot, so adding slot sharing group has the implication on increasing parallelism/slots required i.e. max slots required = sum(max parallelism of each slot sharing group). We don't concern about if operators in the same sharing group are deployed together or not.

Is my understanding correct?

xinbinhuang avatar May 20 '22 16:05 xinbinhuang

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/finegrained_resource/#notice

ah, wasn't realized this! thanks for the reference! I'll give a through read over the weekend.

From the doc section you link, it says that slot sharing group are not enforced on the same slot. However, our use case is slightly different: we want to calculate the max slots required, so the cluster has enough resources before deploying. IIUC, different slot sharing group can't schedule on the same slot, so adding slot sharing group has the implication on increasing parallelism/slots required i.e. max slots required = sum(max parallelism of each slot sharing group). We don't concern about if operators in the same sharing group are deployed together or not.

Is my understanding correct?

I'm afraid not. For example, you put two operators A and B(with parallelism 1) into a slot sharing group. Then, we can deploy them into two physical slots.

KarmaGYZ avatar May 23 '22 01:05 KarmaGYZ

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/finegrained_resource/#notice

ah, wasn't realized this! thanks for the reference! I'll give a through read over the weekend. From the doc section you link, it says that slot sharing group are not enforced on the same slot. However, our use case is slightly different: we want to calculate the max slots required, so the cluster has enough resources before deploying. IIUC, different slot sharing group can't schedule on the same slot, so adding slot sharing group has the implication on increasing parallelism/slots required i.e. max slots required = sum(max parallelism of each slot sharing group). We don't concern about if operators in the same sharing group are deployed together or not. Is my understanding correct?

I'm afraid not. For example, you put two operators A and B(with parallelism 1) into a slot sharing group. Then, we can deploy them into two physical slots.

Ahh, good to know! One follow up question: does this scenario happens often, or it only happens if fine-grained resource management is configured? From our experience, the calculation formula is correct so far but we only use the .slotSharingGroup API but not the fine-grained resource management feature.

For example, you put two operators A and B(with parallelism 1) into a slot sharing group. Then, we can deploy them into two physical slots.

Just for my knowledge, in what situation can this happen?

xinbinhuang avatar May 27 '22 00:05 xinbinhuang

It is just a hint for the scheduling and might not be obeyed in the future. However, I agree that this is indeed helpful information. @godfreyhe WDYT about the compatibility issue introduced by this PR?

@godfreyhe would appreciate if you can help review compatibility issue.

xinbinhuang avatar May 27 '22 00:05 xinbinhuang

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/finegrained_resource/#notice

ah, wasn't realized this! thanks for the reference! I'll give a through read over the weekend. From the doc section you link, it says that slot sharing group are not enforced on the same slot. However, our use case is slightly different: we want to calculate the max slots required, so the cluster has enough resources before deploying. IIUC, different slot sharing group can't schedule on the same slot, so adding slot sharing group has the implication on increasing parallelism/slots required i.e. max slots required = sum(max parallelism of each slot sharing group). We don't concern about if operators in the same sharing group are deployed together or not. Is my understanding correct?

I'm afraid not. For example, you put two operators A and B(with parallelism 1) into a slot sharing group. Then, we can deploy them into two physical slots.

Ahh, good to know! One follow up question: does this scenario happens often, or it only happens if fine-grained resource management is configured? From our experience, the calculation formula is correct so far but we only use the .slotSharingGroup API but not the fine-grained resource management feature.

For example, you put two operators A and B(with parallelism 1) into a slot sharing group. Then, we can deploy them into two physical slots.

Just for my knowledge, in what situation can this happen?

This scenario will not occur in current Flink. But it can occur in the future.

KarmaGYZ avatar May 27 '22 02:05 KarmaGYZ

Breaking the JSON plan is something that can't be done, because it's required for the SQL upgrade story

MartijnVisser avatar Jun 07 '22 17:06 MartijnVisser

@MartijnVisser there might be misunderstanding here. This "JSON plan" is not the SQL "CompiledPlan" but a JSON representation of the StreamGraph. Nevertheless, this is public API so we need to be careful that we don't break down stream systems. I'm pretty sure that platform teams might have build UI or other tooling around this JSON. Hopefully, their logic is flexible enough to accept an additional field. But nevertheless we should not change or remove fields without a good reason.

twalthr avatar Jun 14 '22 11:06 twalthr