celeborn icon indicating copy to clipboard operation
celeborn copied to clipboard

[CELEBORN-1902] Read client throws PartitionConnectionException

Open Austinfjq opened this issue 9 months ago • 10 comments

What changes were proposed in this pull request?

org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException is thrown when RemoteBufferStreamReader finds that the current exception is about connection failure.

Why are the changes needed?

If org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException is correctly thrown to reflect connection failure, then Flink can be aware of the lost Celeborn server side nodes and be able to re-compute affected data. Otherwise, endless retries could cause Flink job failure.

This PR is to deal with exceptions like:

java.io.IOException: org.apache.celeborn.common.exception.CelebornIOException: Failed to connect to ltx1-app10154.prod.linkedin.com/10.88.105.20:23924

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Tested in a Flink batch job with Celeborn.

Austinfjq avatar Mar 11 '25 23:03 Austinfjq

@Austinfjq, could you explain why this throws PartitionConnectionException? IMO, PartitionConnectionException is used to retry upstream task instead of downstream task for PartitionNotFoundException?

SteNicholas avatar Mar 12 '25 02:03 SteNicholas

@Austinfjq, could you explain why this throws PartitionConnectionException?

I think for this following error, PartitionConnectionException could better describe the error is about connection failure instead of partition not found.

java.io.IOException: org.apache.celeborn.common.exception.CelebornIOException: Failed to connect to ltx1-app10154.prod.linkedin.com/10.88.105.20:23924

IMO, PartitionConnectionException is used to retry upstream task instead of downstream task for PartitionNotFoundException?

I might be missing something. Can downstream task be retried? I assume it hasn't started.

Austinfjq avatar Mar 12 '25 20:03 Austinfjq

ping @reswqa @codenohup

RexXiong avatar Mar 14 '25 04:03 RexXiong

During Celeborn worker upgrades, the Flink client is likely to encounter connection exceptions, which can easily lead to upstream task reruns. This may be very costly. If Flink side wants to handle this type of exception, it is best to tolerate the scenario of Celeborn upgrades, eg retry client within a reasonable time at least.

RexXiong avatar Mar 14 '25 06:03 RexXiong

If Flink side wants to handle this type of exception, it is best to tolerate the scenario of Celeborn upgrades, eg retry client within a reasonable time at least.

Yes. IMO, rerun upstream(by PartitionConnectionException)in this scenario is too radical, and most of the time retrying can solve the problem. The client should at least have some retry mechanism.

reswqa avatar Mar 14 '25 08:03 reswqa

@SteNicholas @reswqa Let me add a bit more context on this issue as well as about LI environment itself.

I understand where you're coming from, which is basically for transient worker upgrade issues recomputing the missing partitions from the upstream vertex is a bit costly.

But there are 2 different cases where a Celeborn worker (or host) can be down.

  1. Transient cases (similar to what you mentioned, Celeborn worker software upgrade etc):
  • Data/metadata is still intact and I assume these are expected to be very quick.
  1. Permanent / longer maintenance cases (both planned and unplanned)
  • This will also result in Failed to connect to <host> but the Celeborn worker (or host) won't be able to recover. Typically in our fleet, there is both planned maintenance (software upgrades etc) and there are also the unplanned maintenance cases.
  • Client cannot recover even with retries (note: AFAIK Flink doesn't support replication at this point, replication can alleviate this issue further but cannot fully solve it) in these cases.

For these cases, wouldn't it make sense to throw PartitionConnectionException and retry the portion of the upstream tasks? At least this way, the job will recover and can complete successfully. Otherwise, every task would fail with the same exception (after retries) and eventually exhausting task failure retries on the Flink side.

I see the trade off here. The trade-off lies in balancing re-computation costs for transient issues with improving fault tolerance for permanent cases. One potential solution is to implement client-side retries with exponential backoff and a max timeout for transient cases, while retrying upstream tasks for permanent cases (basically beyond the timeout).

Thoughts?

venkata91 avatar Apr 15 '25 18:04 venkata91

@SteNicholas Can we reopen this PR to continue discussion? I don't have access.

Austinfjq avatar Apr 15 '25 18:04 Austinfjq

cc @mridulm

venkata91 avatar Apr 18 '25 01:04 venkata91

@RexXiong, @SteNicholas the scenario being observed is as follows (Venkat has given details as well):

When there is a worker loss, currently we observe flink application failures as it is not retrying the parent stage to recompute the lost data.

In case of transient failures due to rolling upgrade, etc - we do have io.maxRetries and io.retryWait as knobs to control behavior. But when the worker is lost, and not recoverable in reasonable time - given Flink does not support replication - it results in repeated retries of the task, and eventually application failure.

Wondering if this PR is insufficient, and there is a better way to handle this scenario that is being missed ? If not, is the concern that io.maxRetries and io.retryWait are insufficient to handle rolling upgrade (as modifying them will impact other scenarios as well) ?

Given this was repeatedly observed for fairly expensive flink batch applications, we are trying to find a reliable way to address this issue. Thanks !

mridulm avatar Apr 20 '25 06:04 mridulm

Gentle ping @RexXiong @SteNicholas

venkata91 avatar Apr 24 '25 18:04 venkata91

Gentle ping @RexXiong @SteNicholas

venkata91 avatar May 02 '25 05:05 venkata91

Gentle ping @RexXiong @SteNicholas

Sorry for the late reply. I think we can add a switch, defaulting to false, which will allow us to maintain the current behavior while also enabling it to handle complex environments. WDYT?

RexXiong avatar May 08 '25 07:05 RexXiong

Gentle ping @RexXiong @SteNicholas

Sorry for the late reply. I think we can add a switch, defaulting to false, which will allow us to maintain the current behavior while also enabling it to handle complex environments. WDYT?

ping @venkata91 and @Austinfjq

turboFei avatar May 18 '25 14:05 turboFei

@turboFei Added the switch.

Austinfjq avatar May 20 '25 22:05 Austinfjq

@Austinfjq please fix the code style with dev/reformat.

turboFei avatar May 21 '25 02:05 turboFei

Thanks, I plan to merge this PR and cut branch-0.6.

turboFei avatar May 21 '25 23:05 turboFei

Codecov Report

All modified and coverable lines are covered by tests :white_check_mark:

Project coverage is 63.40%. Comparing base (fff9725) to head (6a1ae70). Report is 29 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3147      +/-   ##
==========================================
- Coverage   63.54%   63.40%   -0.13%     
==========================================
  Files         343      343              
  Lines       20812    20863      +51     
  Branches     1835     1840       +5     
==========================================
+ Hits        13222    13227       +5     
- Misses       6630     6674      +44     
- Partials      960      962       +2     

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

:rocket: New features to boost your workflow:
  • :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • :package: JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

codecov[bot] avatar May 22 '25 00:05 codecov[bot]