[CELEBORN-1902] Read client throws PartitionConnectionException
What changes were proposed in this pull request?
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException is thrown when RemoteBufferStreamReader finds that the current exception is about connection failure.
Why are the changes needed?
If org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException is correctly thrown to reflect connection failure, then Flink can be aware of the lost Celeborn server side nodes and be able to re-compute affected data. Otherwise, endless retries could cause Flink job failure.
This PR is to deal with exceptions like:
java.io.IOException: org.apache.celeborn.common.exception.CelebornIOException: Failed to connect to ltx1-app10154.prod.linkedin.com/10.88.105.20:23924
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Tested in a Flink batch job with Celeborn.
@Austinfjq, could you explain why this throws PartitionConnectionException? IMO, PartitionConnectionException is used to retry upstream task instead of downstream task for PartitionNotFoundException?
@Austinfjq, could you explain why this throws
PartitionConnectionException?
I think for this following error, PartitionConnectionException could better describe the error is about connection failure instead of partition not found.
java.io.IOException: org.apache.celeborn.common.exception.CelebornIOException: Failed to connect to ltx1-app10154.prod.linkedin.com/10.88.105.20:23924
IMO,
PartitionConnectionExceptionis used to retry upstream task instead of downstream task forPartitionNotFoundException?
I might be missing something. Can downstream task be retried? I assume it hasn't started.
ping @reswqa @codenohup
During Celeborn worker upgrades, the Flink client is likely to encounter connection exceptions, which can easily lead to upstream task reruns. This may be very costly. If Flink side wants to handle this type of exception, it is best to tolerate the scenario of Celeborn upgrades, eg retry client within a reasonable time at least.
If Flink side wants to handle this type of exception, it is best to tolerate the scenario of Celeborn upgrades, eg retry client within a reasonable time at least.
Yes. IMO, rerun upstream(by PartitionConnectionException)in this scenario is too radical, and most of the time retrying can solve the problem. The client should at least have some retry mechanism.
@SteNicholas @reswqa Let me add a bit more context on this issue as well as about LI environment itself.
I understand where you're coming from, which is basically for transient worker upgrade issues recomputing the missing partitions from the upstream vertex is a bit costly.
But there are 2 different cases where a Celeborn worker (or host) can be down.
- Transient cases (similar to what you mentioned, Celeborn worker software upgrade etc):
- Data/metadata is still intact and I assume these are expected to be very quick.
- Permanent / longer maintenance cases (both planned and unplanned)
- This will also result in
Failed to connect to <host>but the Celeborn worker (or host) won't be able to recover. Typically in our fleet, there is both planned maintenance (software upgrades etc) and there are also the unplanned maintenance cases. - Client cannot recover even with retries (note: AFAIK Flink doesn't support
replicationat this point, replication can alleviate this issue further but cannot fully solve it) in these cases.
For these cases, wouldn't it make sense to throw PartitionConnectionException and retry the portion of the upstream tasks? At least this way, the job will recover and can complete successfully. Otherwise, every task would fail with the same exception (after retries) and eventually exhausting task failure retries on the Flink side.
I see the trade off here. The trade-off lies in balancing re-computation costs for transient issues with improving fault tolerance for permanent cases. One potential solution is to implement client-side retries with exponential backoff and a max timeout for transient cases, while retrying upstream tasks for permanent cases (basically beyond the timeout).
Thoughts?
@SteNicholas Can we reopen this PR to continue discussion? I don't have access.
cc @mridulm
@RexXiong, @SteNicholas the scenario being observed is as follows (Venkat has given details as well):
When there is a worker loss, currently we observe flink application failures as it is not retrying the parent stage to recompute the lost data.
In case of transient failures due to rolling upgrade, etc - we do have io.maxRetries and io.retryWait as knobs to control behavior. But when the worker is lost, and not recoverable in reasonable time - given Flink does not support replication - it results in repeated retries of the task, and eventually application failure.
Wondering if this PR is insufficient, and there is a better way to handle this scenario that is being missed ?
If not, is the concern that io.maxRetries and io.retryWait are insufficient to handle rolling upgrade (as modifying them will impact other scenarios as well) ?
Given this was repeatedly observed for fairly expensive flink batch applications, we are trying to find a reliable way to address this issue. Thanks !
Gentle ping @RexXiong @SteNicholas
Gentle ping @RexXiong @SteNicholas
Gentle ping @RexXiong @SteNicholas
Sorry for the late reply. I think we can add a switch, defaulting to false, which will allow us to maintain the current behavior while also enabling it to handle complex environments. WDYT?
Gentle ping @RexXiong @SteNicholas
Sorry for the late reply. I think we can add a switch, defaulting to false, which will allow us to maintain the current behavior while also enabling it to handle complex environments. WDYT?
ping @venkata91 and @Austinfjq
@turboFei Added the switch.
@Austinfjq please fix the code style with dev/reformat.
Thanks, I plan to merge this PR and cut branch-0.6.
Codecov Report
All modified and coverable lines are covered by tests :white_check_mark:
Project coverage is 63.40%. Comparing base (
fff9725) to head (6a1ae70). Report is 29 commits behind head on main.
Additional details and impacted files
@@ Coverage Diff @@
## main #3147 +/- ##
==========================================
- Coverage 63.54% 63.40% -0.13%
==========================================
Files 343 343
Lines 20812 20863 +51
Branches 1835 1840 +5
==========================================
+ Hits 13222 13227 +5
- Misses 6630 6674 +44
- Partials 960 962 +2
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
:rocket: New features to boost your workflow:
- :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
- :package: JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.