hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Duplicate fileId exception occurs in Flink Bucket MOR table

Open usberkeley opened this issue 1 year ago • 7 comments

Describe the problem you faced

When Flink is restarted, a Duplicate fileId exception occurs in the Flink Bucket MOR table

To Reproduce

Steps to reproduce the behavior:

  1. Insufficient resources, Checkpoint fails, and Flink restarts frequently
  2. Very small probability, the system fails to restart successfully and reports a Duplicate fileId exception

Expected behavior

No such error

Environment Description

  • Hudi version : 0.15.0

  • Spark version : none

  • Hive version : none

  • Hadoop version : 3.3

  • Storage : HDFS

  • Running on Docker? : no

Extra Conext

  1. Hudi Index Type:Bucket
  2. Hudi table types: MOR
  3. Is the table partitioned?: yes
  4. base file view: 9.5 M 2024-08-12 19:35 /xxx/grass_date=2024-08-04/.00000000-5349-4580-b720-e62c2d3ff483_20240807021703977.log.1_4-6-4 134.8 K 2024-08-07 02:17 /xxx/grass_date=2024-08-04/.00000000-ffda-4be5-810b-55e9acd8bcb8_20240807021134962.log.1_4-6-3 106.5 K 2024-08-07 02:17 /xxx/grass_date=2024-08-04/.00000001-8b95-4932-9a29-c8345632f96e_20240807021134962.log.1_5-6-3 8.3 M 2024-08-13 00:05 /xxx/grass_date=2024-08-04/.00000001-c5c2-4335-b1c2-f524221007e0_20240807021703977.log.1_5-6-4 9.6 M 2024-08-13 00:43 /xxx/grass_date=2024-08-04/.00000002-1288-4e5e-b4e5-690e71215103_20240807021703977.log.1_0-6-4 175.0 K 2024-08-07 02:17 /xxx/grass_date=2024-08-04/.00000002-2538-4413-9be4-1a86822f1012_20240807021134962.log.1_0-6-3 169.2 K 2024-08-07 02:17 /xxx/grass_date=2024-08-04/.00000003-6e56-426d-8d01-22ce1d9fe577_20240807021134962.log.1_1-6-3 9.0 M 2024-08-12 19:57 /xxx/grass_date=2024-08-04/.00000003-d16d-4473-aae2-8c136f377d60_20240807021703977.log.1_1-6-4 10.0 M 2024-08-13 22:46 /xxx/grass_date=2024-08-04/.00000004-67aa-4f66-ba2c-9a615a1563b7_20240807021703977.log.1_2-6-4 179.9 K 2024-08-07 02:17 /xxx/grass_date=2024-08-04/.00000004-bf31-42dc-96fb-2e8c595edb7b_20240807021134962.log.1_2-6-3 139.0 K 2024-08-07 02:17 /xxx/grass_date=2024-08-04/.00000005-9dcf-49c3-b958-ae48f72d114e_20240807021134962.log.1_3-6-3 9.8 M 2024-08-13 03:39 /xxx/grass_date=2024-08-04/.00000005-a0c8-4485-be33-3e0b2d94f61f_20240807021703977.log.1_3-6-4 202.8 K 2024-08-07 02:17 /xxx/grass_date=2024-08-04/.00000006-57be-4dee-86b8-49de00b49bd4_20240807021134962.log.1_4-6-3 8.8 M 2024-08-14 09:49 /xxx/grass_date=2024-08-04/.00000006-f24e-47a7-a937-c0ef79507dec_20240807021703977.log.1_4-6-4 161.2 K 2024-08-07 02:17 /xxx/grass_date=2024-08-04/.00000007-a029-46eb-85b4-e04075c7e36f_20240807021134962.log.1_5-6-3 9.1 M 2024-08-12 03:50 /xxx/grass_date=2024-08-04/.00000007-febc-4bb1-86b6-9e7e949cab0f_20240807021703977.log.1_5-6-4 8.4 M 2024-08-12 19:03 /xxx/grass_date=2024-08-04/.00000008-8af2-48be-9310-d1775fdedf50_20240807021703977.log.1_0-6-4 225.0 K 2024-08-07 02:17 /xxx/grass_date=2024-08-04/.00000008-cf3b-404c-af37-aad4d4755e60_20240807021134962.log.1_0-6-3 142.7 K 2024-08-07 02:17 /xxx/grass_date=2024-08-04/.00000009-9b60-41bd-891d-c41e3b69a5fb_20240807021134962.log.1_1-6-3 9.1 M 2024-08-13 18:06 /xxx/grass_date=2024-08-04/.00000009-df00-4e7b-b499-20308cbdb4f4_20240807021703977.log.1_1-6-4 149.0 K 2024-08-07 02:17 /xxx/grass_date=2024-08-04/.00000010-4a39-4c0f-9218-c26eed34f395_20240807021134962.log.1_2-6-3 9.5 M 2024-08-13 14:43 /xxx/grass_date=2024-08-04/.00000010-b3d3-40f4-9aaa-ed4ac182a167_20240807021703977.log.1_2-6-4 100.5 K 2024-08-07 02:17 /xxx/grass_date=2024-08-04/.00000011-006a-4a2c-b904-16c0392b93cb_20240807021134962.log.1_3-6-3 10.0 M 2024-08-13 20:46 /xxx/grass_date=2024-08-04/.00000011-da99-433a-975c-6eb85c3f3854_20240807021703977.log.1_3-6-4 9.7 M 2024-08-13 12:55 /xxx/grass_date=2024-08-04/.00000012-0449-470f-a867-9c700b47708f_20240807021703977.log.1_4-6-4 63.3 K 2024-08-07 02:17 /xxx/grass_date=2024-08-04/.00000012-0c22-4df1-8d6e-3a05c952038a_20240807021134962.log.1_4-6-3 148.1 K 2024-08-07 02:17 /xxx/grass_date=2024-08-04/.00000013-65b9-461d-99a6-a2258ac1997f_20240807021134962.log.1_5-6-3 1.6 M 2024-08-07 07:23 /xxx/grass_date=2024-08-04/.00000013-b092-4b9f-8954-5f05a83b57b3_20240807021703977.log.1_5-6-4 6.4 M 2024-08-13 00:45 /xxx/grass_date=2024-08-04/.00000013-b092-4b9f-8954-5f05a83b57b3_20240807021703977.log.2_1-0-1 130.0 K 2024-08-07 02:17 /xxx/grass_date=2024-08-04/.00000014-57db-48ed-9857-121673008f58_20240807021134962.log.1_0-6-3 9.2 M 2024-08-13 14:43 /xxx/grass_date=2024-08-04/.00000014-6c39-4c0a-85b5-f593058cda61_20240807021703977.log.1_0-6-4 9.7 M 2024-08-13 01:53 /xxx/grass_date=2024-08-04/.00000015-0f53-49c4-84de-96be8d413d92_20240807021703977.log.1_1-6-4 93.2 K 2024-08-07 02:17 /xxx/grass_date=2024-08-04/.00000015-5df9-42dd-b1dd-d8ee71673b61_20240807021134962.log.1_1-6-3 9.7 M 2024-08-12 18:14 /xxx/grass_date=2024-08-04/.00000016-4581-4908-b4d7-97d96dcdfd2a_20240807021703977.log.1_2-6-4 126.0 K 2024-08-07 02:17 /xxx/grass_date=2024-08-04/.00000016-a63a-4f04-877e-1d32bb1929e1_20240807021134962.log.1_2-6-3 71.4 K 2024-08-07 02:17 /xxx/grass_date=2024-08-04/.00000017-6536-4c3d-9bf4-c6e8a6ca1f0f_20240807021134962.log.1_3-6-3 9.0 M 2024-08-13 03:39 /xxx/grass_date=2024-08-04/.00000017-edd9-4d26-a114-e63ac63056a0_20240807021703977.log.1_3-6-4

Stacktrace

Caused by: java.lang.RuntimeException: Duplicate fileId 00000016-4581-4908-b4d7-97d96dcdfd2a from bucket 16 of partition grass_date=2024-08-04 found during the BucketStreamWriteFunction index bootstrap. at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.lambda$bootstrapIndexIfNeed$2(BucketStreamWriteFunction.java:172) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.Streams$StreamBuilderImpl.forEachRemaining(Streams.java:419) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.bootstrapIndexIfNeed(BucketStreamWriteFunction.java:165) at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.processElement(BucketStreamWriteFunction.java:115) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:87) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:532) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:822) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:771) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569) at java.lang.Thread.run(Thread.java:748)

usberkeley avatar Aug 16 '24 06:08 usberkeley

Maybe related with these two fixes: https://github.com/apache/hudi/pull/5185 and https://github.com/apache/hudi/pull/8263

danny0405 avatar Aug 16 '24 10:08 danny0405

thanks. Still tracking the issue

usberkeley avatar Aug 19 '24 01:08 usberkeley

@danny0405 danny,the problem has been located. After HUDI-1517 enhanced the Marker, the change in the Marker file suffix caused the judgment condition to change. The log file of a single delta commit was not deleted in the Log File Marker.

The Master branch has solved this problem through Marker IOType.Create, that is, Marker Based Rollback will delete the log file of a single delta commit.

For version 0.15, we have fixed the problem. Do we still need to contribute code to the remote/0.15 branch to fix this problem?

usberkeley avatar Aug 26 '24 08:08 usberkeley

The last few lines of code are the solution for version 0.15 (starting with “Added by usberkeley”), Friends who need it can take a look

protected HoodieRollbackRequest getRollbackRequestForAppend(HoodieInstant instantToRollback, String fileNameWithPartitionToRollback) {
    StoragePath fullLogFilePath = new StoragePath(basePath, fileNameWithPartitionToRollback);
    String relativePartitionPath = FSUtils.getRelativePartitionPath(new StoragePath(basePath), fullLogFilePath.getParent());
    String fileId;
    String baseCommitTime;
    Option<HoodieLogFile> latestLogFileOption;
    Map<String, Long> logBlocksToBeDeleted = new HashMap<>();
    // Old marker files may be generated from base file name before HUDI-1517. keep compatible with them.
    if (FSUtils.isBaseFile(fullLogFilePath)) {
      LOG.warn("Find old marker type for log file: " + fileNameWithPartitionToRollback);
      fileId = FSUtils.getFileIdFromFilePath(fullLogFilePath);
      baseCommitTime = FSUtils.getCommitTime(fullLogFilePath.getName());
      StoragePath partitionPath = FSUtils.constructAbsolutePath(config.getBasePath(), relativePartitionPath);

      // NOTE: Since we're rolling back incomplete Delta Commit, it only could have appended its
      //       block to the latest log-file
      try {
        latestLogFileOption = FSUtils.getLatestLogFile(table.getMetaClient().getStorage(), partitionPath, fileId,
            HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);
        if (latestLogFileOption.isPresent() && baseCommitTime.equals(instantToRollback.getTimestamp())) {
          StoragePath fullDeletePath = new StoragePath(partitionPath, latestLogFileOption.get().getFileName());
          return new HoodieRollbackRequest(relativePartitionPath, EMPTY_STRING, EMPTY_STRING,
              Collections.singletonList(fullDeletePath.toString()),
              Collections.emptyMap());
        }
        if (latestLogFileOption.isPresent()) {
          HoodieLogFile latestLogFile = latestLogFileOption.get();
          // NOTE: Markers don't carry information about the cumulative size of the blocks that have been appended,
          //       therefore we simply stub this value.
          logBlocksToBeDeleted = Collections.singletonMap(latestLogFile.getPathInfo().getPath().toString(), latestLogFile.getPathInfo().getLength());
        }
        return new HoodieRollbackRequest(relativePartitionPath, fileId, baseCommitTime, Collections.emptyList(), logBlocksToBeDeleted);
      } catch (IOException ioException) {
        throw new HoodieIOException(
            "Failed to get latestLogFile for fileId: " + fileId + " in partition: " + partitionPath,
            ioException);
      }
    } else {
      HoodieLogFile logFileToRollback = new HoodieLogFile(fullLogFilePath);
      fileId = logFileToRollback.getFileId();
      baseCommitTime = logFileToRollback.getBaseCommitTime();
      // NOTE: We don't strictly need the exact size, but this size needs to be positive to pass metadata payload validation.
      //       Therefore, we simply stub this value (1L), instead of doing a fs call to get the exact size.
      logBlocksToBeDeleted = Collections.singletonMap(logFileToRollback.getPath().getName(), 1L);
    }
    
    // **Added by usberkeley** 
    // Log file can be deleted if the commit to rollback is also the commit that created the fileGroup
    if (baseCommitTime.equals(instantToRollback.getTimestamp())) {
      StoragePath partitionPath = FSUtils.constructAbsolutePath(config.getBasePath(), relativePartitionPath);

      try {
        latestLogFileOption = FSUtils.getLatestLogFile(table.getMetaClient().getStorage(), partitionPath, fileId,
                HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);

        if (latestLogFileOption.isPresent()) {
          StoragePath fullFilePathToDelete = new StoragePath(partitionPath, latestLogFileOption.get().getFileName());
          return new HoodieRollbackRequest(relativePartitionPath, EMPTY_STRING, EMPTY_STRING,
                  Collections.singletonList(fullFilePathToDelete.toString()),
                  Collections.emptyMap());
        }
      } catch (IOException ioException) {
        throw new HoodieIOException("Failed to get latestLogFile for fileId: " + fileId + " in partition: " + partitionPath, ioException);
      }
    }

    return new HoodieRollbackRequest(relativePartitionPath, fileId, baseCommitTime, Collections.emptyList(), logBlocksToBeDeleted);
  }

usberkeley avatar Aug 26 '24 08:08 usberkeley

@usberkeley Thanks for the nice findings, would you mind to contribute this fix into 0.x-branch?

danny0405 avatar Aug 27 '24 01:08 danny0405

@usberkeley Thanks for raising this. I also noticed that the logic in the else branch of FSUtils.isBaseFile(fullLogFilePath) is not consistent with the if branch when considering the log file is generated by the same instant time as the file group (this is OK with Spark, but a problem on Flink).

yihua avatar Aug 27 '24 01:08 yihua

@usberkeley Thanks for the nice findings, would you mind to contribute this fix into 0.x-branch?

my pleasure

usberkeley avatar Aug 27 '24 14:08 usberkeley

@danny0405 @usberkeley I've updated #11830 to include the fix to this issue. I take a simpler approach by achieving the same. PTAL.

yihua avatar Aug 30 '24 01:08 yihua

@danny0405 @usberkeley I've updated #11830 to include the fix to this issue. I take a simpler approach by achieving the same. PTAL.

Wow, great, thanks Ethan,

usberkeley avatar Aug 31 '24 07:08 usberkeley

Thanks Danny and Ethan. Close the issue, resolved.

usberkeley avatar Aug 31 '24 07:08 usberkeley