hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[HUDI-6194] prevent flink writer getting the wrong instant to write

Open hbgstc123 opened this issue 2 years ago • 10 comments

Flink write hudi If all task failover and send bootstrap event to StreamWriteOperatorCoordinator, and StreamWriteOperatorCoordinator happen to start a new instant i1 before receiving all the bootstrap events, the AbstractStreamWriteFunction may start to write with instant i1 when StreamWriteOperatorCoordinator start to rollback i1 and start a new instant i2. AbstractStreamWriteFunction will write to a wrong instant i1.

Change Logs

StreamWriteOperatorCoordinator will not start a new instant if there is a pending instant but reuse the pending instant, to prevent AbstractStreamWriteFunction get the wrong instant to write.

Impact

no

Risk level (write none, low medium or high below)

If medium or high, explain what verification was done to mitigate the risks.

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the ticket number here and follow the instruction to make changes to the website.

Contributor's checklist

  • [ ] Read through contributor's guide
  • [ ] Change Logs and Impact were stated clearly
  • [ ] Adequate tests were added if applicable
  • [ ] CI passed

hbgstc123 avatar May 09 '23 09:05 hbgstc123

related issue: https://github.com/apache/hudi/issues/8674

hbgstc123 avatar May 09 '23 09:05 hbgstc123

@hbgstc123 Would u mind update this PR. Thanks!

zhangyue19921010 avatar Jan 04 '24 10:01 zhangyue19921010

@hbgstc123 Would u mind update this PR. Thanks!

@zhangyue19921010 updated

hbgstc123 avatar Jan 05 '24 02:01 hbgstc123

CI report:

  • 9156199ceacc08a50633d67d28f79689435f05b7 UNKNOWN
  • 76f83d059deee1ded5fd349dcff1974027c7b754 Azure: FAILURE
Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

hudi-bot avatar Jan 05 '24 05:01 hudi-bot

Adding some illustration for future reference:

Normal Flow

image

As can be seen in the normal flow (between the JM and TM), a cycle is depicted by the green box.

TM global failover image

When there are TM restarts, global_failover will happen and restored_write_meta events (which will be detected by the JM as a bootstrap event) will be sent to the JM. Once JM has collected all the restored_write_meta events, a normal cycle will be invoked again.

This is done by rolling back the commit that is already on the timeline (and therefore, all its corresponding files that might have been written to the filesystem (via markers). After a rollback is done, a new commit is created and the normal flow as depicted in the section above is hence restored.

This case is handled and the job can recover properly for such failures

JM blocked while TM global failover is occuring Note: this case is not handled as of now.

image

As can be seen here, if the JM is blocked by a rollback/archive. In this example, an archive, and if checkpoint timeouts, causing a global failover, TMs will restart.

As discussed previously, TM will send restored_write_meta event to the JM. Since JM's executor is being blocked at this point in time (performing an archive) invoked from #notifyCheckpointComplete, it will not be able to handle events from operator.

Once archive is done, it will be unblocked and a new instant is hence created by the JM (following the normal execution flow). For illustration, we will name this instant instant_x (created by start_commit_2 in the above diagram.

After JM exits from #notifyCheckpointComplete. At this point, TM will see the new instant_x and start performing the writes.

JM will start handling the operator events and will perform a #startInstant, rolling back the previous instant_x that was just created and create a new instant, instant_y (created by start_commit_3 in our diagram). TM is unaware of this and will continue with the previous fetched instant_x.

Desync happens and the writer will be perform abnormally until the instant between JM and TM lines up.

Once desync happens, alot of unhandled cases can happen, depending on the state of the a TM when rollback of instant_x is happening at the JM. For brevity, we will not discuss them as they should not be happening in the first place.

TLDR Blocked JM might will cause desync between JM and TM.

voonhous avatar Jan 05 '24 09:01 voonhous

@voonhous , thanks for the detailed analysis, I will take some time to take a look once I get time.

danny0405 avatar Jan 07 '24 09:01 danny0405

Hi @voonhous Thanks! I believe the root cause for out of sync between JM and TM is rollback action is not in a stop the world state. And Sorry to modify your comment by accident Orz..

zhangyue19921010 avatar Jan 08 '24 03:01 zhangyue19921010

@zhangyue19921010 No worries, added the markdown formatting back. :)

Yeap, root cause here is that checkpoint interval is still running and a long-running rollback/archive might cause the checkpoint to timeout.

Will stop-the-world stop the checkpoint timeout from happening too? And if so, what API do we need to call within Flink to support such behaviour?

voonhous avatar Jan 08 '24 06:01 voonhous

Hello @danny0405 , sorry to bother you, could you please take a look at this issue when you are free? This is indeed a serious bug. It is related to the core interface and needs to be checked by the boss :)

zhangyue19921010 avatar Jan 16 '24 05:01 zhangyue19921010

@danny0405 @zhangyue19921010 any updates for this issue?

BruceKellan avatar Feb 20 '24 06:02 BruceKellan

cc @danny0405 on this

yihua avatar Mar 26 '24 05:03 yihua