trino icon indicating copy to clipboard operation
trino copied to clipboard

Fix hudi connector gets stuck #19506

Open willzgw opened this issue 1 year ago • 14 comments

Description

-Fix hudi connector gets stuck

Additional context and related issues

  • #19506

Release notes

( ) This is not user-visible or is docs only, and no release notes are required. ( ) Release notes are required. Please propose a release note for me. ( ) Release notes are required, with the following suggested text:

# Section
* Fix: Trino Hudi connector gets stuck while attempting to read empty partitions of a partitioned Hudi table. ({issue}`19506 `)

willzgw avatar Dec 05 '23 17:12 willzgw

Hi @electrum I'm not quite sure what to do when we get an empty partition, ignore it or throw an exception?

willzgw avatar Dec 13 '23 14:12 willzgw

Hi @electrum I'm not quite sure what to do when we get an empty partition, ignore it or throw an exception?

Hi @willzgw, thank you very much for your contribution! This helps a lot. Just a suggestion for your question: In the hive connector there is this property called "hive.ignore-absent-partitions" with default "false" to switch the behavior. Maybe you could implement the same for the hudi connector?

uroell avatar Jan 03 '24 09:01 uroell

#20151 fixes the underlying problem of Trino failing on empty Hudi partitions. So when this PR gets merged, there should be no reason to handle it here. So it should be enough to reduce the scope of this PR to something like

--- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java
@@ -14,6 +14,7 @@
 package io.trino.plugin.hudi.split;
 
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import io.trino.plugin.hive.util.AsyncQueue;
 import io.trino.plugin.hudi.HudiTableHandle;
 import io.trino.plugin.hudi.partition.HudiPartitionInfoLoader;
@@ -28,8 +29,8 @@ import java.util.List;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
 
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static io.trino.plugin.hudi.HudiErrorCode.HUDI_CANNOT_OPEN_SPLIT;
 import static io.trino.plugin.hudi.HudiSessionProperties.getSplitGeneratorParallelism;
 import static java.util.Objects.requireNonNull;
@@ -66,7 +67,7 @@ public class HudiBackgroundSplitLoader
     {
         Deque<String> partitionQueue = new ConcurrentLinkedDeque<>(partitions);
         List<HudiPartitionInfoLoader> splitGeneratorList = new ArrayList<>();
-        List<Future> splitGeneratorFutures = new ArrayList<>();
+        List<ListenableFuture<Void>> splitGeneratorFutures = new ArrayList<>();
 
         // Start a number of partition split generators to generate the splits in parallel
         for (int i = 0; i < splitGeneratorNumThreads; i++) {
@@ -79,16 +80,18 @@ public class HudiBackgroundSplitLoader
             // Let the split generator stop once the partition queue is empty
             generator.stopRunning();
         }
-
-        // Wait for all split generators to finish
-        for (Future future : splitGeneratorFutures) {
-            try {
-                future.get();
-            }
-            catch (InterruptedException | ExecutionException e) {
-                throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Error generating Hudi split", e);
-            }
+        try {
+            // Wait for all split generators to finish
+            Futures.whenAllComplete(splitGeneratorFutures) // also succeeds when some tasks fail
+                    .run(asyncQueue::finish, directExecutor())
+                    .get(); // will throw an ExecutionException when one of the tasks failed
+        }
+        catch (ExecutionException e) {
+            throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Error generating Hudi split", e);
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Error generating Hudi split", e);
         }
-        asyncQueue.finish();
     }
 }

codesorcery avatar Jan 23 '24 12:01 codesorcery

Hi @ebyhr @electrum I tried to add a test to TestHudiSmokeTest but Trino will throw exception when creating table due to partition not exists. It seems not convenient to add test cases. Do you have any suggestions?

willzgw avatar Jan 25 '24 09:01 willzgw

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

github-actions[bot] avatar Feb 15 '24 17:02 github-actions[bot]

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

github-actions[bot] avatar Mar 11 '24 17:03 github-actions[bot]

I tried to add a test to TestHudiSmokeTest but Trino will throw exception when creating table due to partition not exists.

@willzgw Sorry for my late reply. We could create the table on Spark and add the contents under resources directory. You can refer to resources/hudi-testing-data/hudi_cow_pt_tbl.

ebyhr avatar Mar 14 '24 07:03 ebyhr

I tried to add a test to TestHudiSmokeTest but Trino will throw exception when creating table due to partition not exists.

@willzgw Sorry for my late reply. We could create the table on Spark and add the contents under resources directory. You can refer to resources/hudi-testing-data/hudi_cow_pt_tbl.

Hi @ebyhr Thanks for your advice. I've actually tried this approach before. We want to test the case where the Partition does not exist, which requires the metadata to contain files that do not actually exist (in resource). However, it will report an error by prompting that the corresponding file is missing from the Resource.

willzgw avatar Mar 14 '24 10:03 willzgw

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

github-actions[bot] avatar Apr 04 '24 17:04 github-actions[bot]

Should we merge this PR given that you approved @electrum ?

mosabua avatar Apr 06 '24 03:04 mosabua

Do I see it right, that one then has to explicitly set hudi.ignore-absent-partitions=false to get the current behavior? IMHO, that should then definitely be highlighted in the release notes, since it's a breaking change.

codesorcery avatar Apr 07 '24 09:04 codesorcery

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

github-actions[bot] avatar May 02 '24 17:05 github-actions[bot]

Closing this pull request, as it has been stale for six weeks. Feel free to re-open at any time.

github-actions[bot] avatar May 24 '24 17:05 github-actions[bot]