hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[HUDI-7507] Adding timestamp ordering validation before creating requested instant

Open nsivabalan opened this issue 1 year ago • 4 comments

Change Logs

When multiple writers trigger table services, there is a chance that one of them could create requested in a different ordering compared to the actual timestamp. Linked jira has more details of the scenario w/ an illustration. This patch, ensure that before creating a requested entry in the timeline, there is no other instant greater than the current instant time.

Major reason is, a writer could generate a new commit time in memory (say t10), and then take a long time to do some computation and eventually add the requested instant to timeline very late (say t100). In b/w these two time, there could be another concurrent writer choosing t25 as commit time and proceed. This might lead to unexpected behaviors as called out in the jira.

This patch is for 0.x branch. here is the equivalent 1.x branch PR https://github.com/apache/hudi/pull/11344

Impact

Describe any public API or user-facing feature change or any performance impact.

Risk level (write none, low medium or high below)

If medium or high, explain what verification was done to mitigate the risks.

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the ticket number here and follow the instruction to make changes to the website.

Contributor's checklist

  • [ ] Read through contributor's guide
  • [ ] Change Logs and Impact were stated clearly
  • [ ] Adequate tests were added if applicable
  • [ ] CI passed

nsivabalan avatar Jul 05 '24 19:07 nsivabalan

hey @danny0405 : I was triaging some of the test failures in this patch. Looks like we are breaking something in flink. I see code snippet here where we are choosing an instant time for new compaction back in time (prior to another pending delta commit). So, what we are trying to achieve in this patch may not sit well w/ this logic. Lets sync up to see how we can make progress here.

nsivabalan avatar Jul 16 '24 20:07 nsivabalan

Sorry, the fix may resolve some use cases but I don't think it is a thorough solution and we still lose the TrueTime semantics for instant time

danny0405 avatar Jul 16 '24 23:07 danny0405

Adding TimeGenerator support to 0.1x branch here https://github.com/apache/hudi/pull/11812

nsivabalan avatar Aug 21 '24 15:08 nsivabalan

@kbuci @yihua : patch is ready for review based on the consensus we had.

nsivabalan avatar Sep 08 '24 16:09 nsivabalan

Thanks for adding to change to perform the scheduling of write commits within a transaction, as discussed offline. This PR looks fine to me, though should we create a ticket for handling rollback case in future (since as discussed if a rollback creates a deltacommits on MDT but a compaction with higher timestamp is already present, then not sure if thats legal)

kbuci avatar Sep 13 '24 02:09 kbuci

https://github.com/apache/hudi/pull/11580#discussion_r1767232434

for this: I don't get you. but, we do take locks.

  public Option<String> scheduleTableService(String instantTime, Option<Map<String, String>> extraMetadata,
                                             TableServiceType tableServiceType) {
    // A lock is required to guard against race conditions between an ongoing writer and scheduling a table service.
    final Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED,
        tableServiceType.getAction(), instantTime));
    try {
      this.txnManager.beginTransaction(inflightInstant, Option.empty());
      LOG.info("Scheduling table service {}", tableServiceType);
      return scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
    } finally {
      this.txnManager.endTransaction(inflightInstant);
    }
  }

within the BaseHoodieTableServiceClient

nsivabalan avatar Sep 20 '24 22:09 nsivabalan

#11580 (comment)

for this: I don't get you. but, we do take locks.

  public Option<String> scheduleTableService(String instantTime, Option<Map<String, String>> extraMetadata,
                                             TableServiceType tableServiceType) {
    // A lock is required to guard against race conditions between an ongoing writer and scheduling a table service.
    final Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED,
        tableServiceType.getAction(), instantTime));
    try {
      this.txnManager.beginTransaction(inflightInstant, Option.empty());
      LOG.info("Scheduling table service {}", tableServiceType);
      return scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
    } finally {
      this.txnManager.endTransaction(inflightInstant);
    }
  }

within the BaseHoodieTableServiceClient

@nsivabalan what I meant to ask is do we need to validate using TimestampUtils.validateForLatestTimestamp(metaClient, inflightInstant); within the transaction instead of calling it inside individual comapction/clustering plan executor?

codope avatar Sep 21 '24 03:09 codope

hey @codope : that would be tough to validate. bcoz, for some of the callers, lock is taken 3 to 4 call stack above and don't feel its worth passing txnManager all the way to validate that lock is acquired.

I do get your intent. But for mdt writes also, we ensure we call it from within a lock. We don't have

ValidationUtils.checkArgument(txnManager.isLockRequired(),"Lock needs to be acquired in order to perform timestamp validation");

this sort of validation for every write to MDT.

nsivabalan avatar Sep 23 '24 15:09 nsivabalan

hey @danny0405 : I was chasing some test failures in this patch and realized that flink might have an issue. In this piece of code block, we generate a compaction time in the past. So, the additional validation in this patch may not sit well w/ flink.

Do you have any thoughts on how to go about it.

nsivabalan avatar Sep 25 '24 13:09 nsivabalan

hey @danny0405 : I was chasing some test failures in this patch and realized that flink might have an issue. In this piece of code block, we generate a compaction time in the past. So, the additional validation in this patch may not sit well w/ flink.

Do you have any thoughts on how to go about it.

I kind of think only if there is no completed commits after the compaction instant, we are good to use it, there is no need to force a strong increasing sequence.

danny0405 avatar Sep 26 '24 00:09 danny0405

I was chasing some test failures in this patch and realized that flink might have an issue. In this piece of code block, we generate a compaction time in the past. So, the additional validation in this patch may not sit well w/ flink.

@nsivabalan My understanding of MOR compaction on latest 0.x is likely out of date so sorry if my comment here might not make sense , but I assumed that (in 0.x) once a compaction plan with instant time T targeting a file group is created, any write (deltacommit) that has a greater instant time than T will create a new log file with an instant time of T (assuming appends are disabled). If this is the case, then if you have a MOR dataset with [C0.deltacommit, C2.deltacommit.inflight] and then a compaction plan is scheduled with earlier timestamp [C0.deltacommit, C1.compaction.requested, C2.deltacommit.requested] , then there might be no issue on the base table as long as C2 fails itself during write conflict resolution. But if this MOR dataset has a metadata table, then we might find ourselves in same case we discussed offline (first scenario in https://issues.apache.org/jira/browse/HUDI-7507). Specifically, if the writer that worked on C2 (or a greater instant) scheduled a compaction on the metadata table.

kbuci avatar Sep 27 '24 21:09 kbuci

hey @danny0405 : based on our direct sync up, I am adding these validations only for spark and java. I will let you follow up on flink later.

nsivabalan avatar Sep 30 '24 23:09 nsivabalan

Adding retries might be hard. For eg incase of table services, we are validating just before adding the requested to the timeline. But the instant time is generated at higher layers (write client), but the requested is added by the PlanActionExecutor and only incase there is a valid plan generated. Atleast for startCommitTimes() i.e. for ingestion commits, we could try and make commit time generation method as a function and keep retrying, but for table services, we might have to make lot of changes.

and since this is meant only for 0.x, wondering if its really worth adding the retries.

nsivabalan avatar Oct 02 '24 13:10 nsivabalan

@hudi-bot run azure

nsivabalan avatar Oct 04 '24 03:10 nsivabalan

CI report:

  • 3a1c57e3dc77d325881e8093a72bff4927cad160 UNKNOWN
  • dd08d9c08f25111eaecd046e0c8c7a3219c62fd1 UNKNOWN
  • 644d075fede55dbba3e789d776d35daff78287aa UNKNOWN
  • c3d50cf5888efede74dbf5146a912e6fa4eddc6b Azure: FAILURE Azure: SUCCESS
Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

hudi-bot avatar Oct 04 '24 04:10 hudi-bot

Is this validation needed by cleaning table service?

danny0405 avatar Oct 08 '24 01:10 danny0405