hudi
hudi copied to clipboard
Apache Hudi Consistency issues with glue and marketplace connector
I am using Apache based data-lake on AWS S3. All my ETL processes are executed through AWS Glue 3.0. And I use Apache Hudi Connector for AWS Glue (https://aws.amazon.com/marketplace/pp/prodview-6rofemcq6erku?ref_=beagle&applicationId=GlueStudio) to connect to S3 based data-lake from my AWS Glue jobs. Recently I am seeing below error in one of my glue jobs,
An error occurred while calling o304.pyWriteDynamicFrame.
: org.apache.hudi.exception.HoodieCommitException: Failed to complete commit 20220915064443442 due to finalize errors.
at org.apache.hudi.client.AbstractHoodieWriteClient.finalizeWrite(AbstractHoodieWriteClient.java:1150)
at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:224)
at org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:197)
at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:125)
at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:635)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:286)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
at com.amazonaws.services.glue.marketplace.connector.SparkCustomDataSink.writeDynamicFrame(CustomDataSink.scala:45)
at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:64)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieIOException: Consistency check failed to ensure all files APPEAR
at org.apache.hudi.table.HoodieTable.waitForAllFiles(HoodieTable.java:621)
at org.apache.hudi.table.HoodieTable.reconcileAgainstMarkers(HoodieTable.java:588)
at org.apache.hudi.table.HoodieTable.finalizeWrite(HoodieTable.java:511)
at org.apache.hudi.client.AbstractHoodieWriteClient.finalizeWrite(AbstractHoodieWriteClient.java:1141)
... 46 more
My Hudi config file is as below,
'className' : 'org.apache.hudi',
'hoodie.table.name': <<table_name>>,
'hoodie.datasource.write.recordkey.field': <<record_key>>,
'hoodie.datasource.write.partitionpath.field': <<partition_path>>,
'hoodie.datasource.write.table.name': <<table_name>>,
'hoodie.datasource.write.operation': upsert,
'hoodie.datasource.write.precombine.field': <<pre_combine>>,
'hoodie.upsert.shuffle.parallelism': 20,
'hoodie.insert.shuffle.parallelism': 20,
'hoodie.consistency.check.enabled': 'true',
'hoodie.datasource.hive_sync.use_jdbc':'false',
'hoodie.datasource.hive_sync.database': <<database>>,
'hoodie.datasource.hive_sync.table': <<table_name>>,
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.partition_fields': <<partition_path>>,
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'path': <<s3_path>>
I see a similar issue raised previously - https://github.com/apache/hudi/issues/1764. And it is marked as fixed. Just wondering whether its the similar issue is this is a separate issue. And is there any known workarounds ?
@asankadarshana007 have you filed aws support case? this is aws-managed env and app. Not sure why you have to set 'hoodie.consistency.check.enabled': 'true'. this is not needed as s3 has strong consistency quite some time back.
Hey @xushiyan - thank you for your reply. At the time when I initially started developing these jobs (a long time back with the previous version of hudi connector - 0.5.x), I wasn't aware of the fact that s3 has strong consistency. I assumed it was eventual consistency. I can change the hudi config value to 'hoodie.consistency.check.enabled': 'false'
and test. But do you assume it might have a connection to this error ? I assume with 'hoodie.consistency.check.enabled': 'true'
, hudi would try to make sure that all files are available - as an extra check ?
I have raised a support ticket with AWS. But it could take sometime to get some resolution out of them 😓
@asankadarshana007 The consistency check (code below), when enabled, happens when removing invalid data files: (1) check that all paths to delete exist, (2) delete them, (3) wait for all paths to disappear after eventual consistency. Note that step (1) and (3) are not needed for strong consistency. As the invalid data files are now determined based on the markers, there could be a case where a marker is created, but the data file has not started being written, so that the check (1) fails, which is okay. Given that there is no use case for the eventual consistency atm, we don't maintain the logic.
Let me know if turning off hoodie.consistency.check.enabled
solves your problem. You can close the ticket if all good.
if (!invalidDataPaths.isEmpty()) {
LOG.info("Removing duplicate data files created due to task retries before committing. Paths=" + invalidDataPaths);
Map<String, List<Pair<String, String>>> invalidPathsByPartition = invalidDataPaths.stream()
.map(dp -> Pair.of(new Path(basePath, dp).getParent().toString(), new Path(basePath, dp).toString()))
.collect(Collectors.groupingBy(Pair::getKey));
// Ensure all files in delete list is actually present. This is mandatory for an eventually consistent FS.
// Otherwise, we may miss deleting such files. If files are not found even after retries, fail the commit
if (consistencyCheckEnabled) {
// This will either ensure all files to be deleted are present.
waitForAllFiles(context, invalidPathsByPartition, FileVisibility.APPEAR);
}
// Now delete partially written files
context.setJobStatus(this.getClass().getSimpleName(), "Delete all partially written files: " + config.getTableName());
deleteInvalidFilesByPartitions(context, invalidPathsByPartition);
// Now ensure the deleted files disappear
if (consistencyCheckEnabled) {
// This will either ensure all files to be deleted are absent.
waitForAllFiles(context, invalidPathsByPartition, FileVisibility.DISAPPEAR);
}
}
Disabling should resolve the issue. In later Hudi versions, this is disabled by default. See https://github.com/apache/hudi/issues/2983 Please reopen if you still face this issue after disabling consistency check.