hudi icon indicating copy to clipboard operation
hudi copied to clipboard

Apache Hudi Consistency issues with glue and marketplace connector

Open asankadarshana007 opened this issue 1 year ago • 3 comments

I am using Apache based data-lake on AWS S3. All my ETL processes are executed through AWS Glue 3.0. And I use Apache Hudi Connector for AWS Glue (https://aws.amazon.com/marketplace/pp/prodview-6rofemcq6erku?ref_=beagle&applicationId=GlueStudio) to connect to S3 based data-lake from my AWS Glue jobs. Recently I am seeing below error in one of my glue jobs,

An error occurred while calling o304.pyWriteDynamicFrame.
: org.apache.hudi.exception.HoodieCommitException: Failed to complete commit 20220915064443442 due to finalize errors.
	at org.apache.hudi.client.AbstractHoodieWriteClient.finalizeWrite(AbstractHoodieWriteClient.java:1150)
	at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:224)
	at org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:197)
	at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:125)
	at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:635)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:286)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
	at com.amazonaws.services.glue.marketplace.connector.SparkCustomDataSink.writeDynamicFrame(CustomDataSink.scala:45)
	at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:64)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieIOException: Consistency check failed to ensure all files APPEAR
	at org.apache.hudi.table.HoodieTable.waitForAllFiles(HoodieTable.java:621)
	at org.apache.hudi.table.HoodieTable.reconcileAgainstMarkers(HoodieTable.java:588)
	at org.apache.hudi.table.HoodieTable.finalizeWrite(HoodieTable.java:511)
	at org.apache.hudi.client.AbstractHoodieWriteClient.finalizeWrite(AbstractHoodieWriteClient.java:1141)
	... 46 more

My Hudi config file is as below,

'className' : 'org.apache.hudi',
'hoodie.table.name': <<table_name>>,
'hoodie.datasource.write.recordkey.field': <<record_key>>,
'hoodie.datasource.write.partitionpath.field': <<partition_path>>,
'hoodie.datasource.write.table.name': <<table_name>>,
'hoodie.datasource.write.operation': upsert,
'hoodie.datasource.write.precombine.field': <<pre_combine>>,
'hoodie.upsert.shuffle.parallelism': 20,
'hoodie.insert.shuffle.parallelism': 20,
'hoodie.consistency.check.enabled': 'true',
'hoodie.datasource.hive_sync.use_jdbc':'false',
'hoodie.datasource.hive_sync.database': <<database>>,
'hoodie.datasource.hive_sync.table': <<table_name>>,
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.partition_fields': <<partition_path>>,
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'path': <<s3_path>>

I see a similar issue raised previously - https://github.com/apache/hudi/issues/1764. And it is marked as fixed. Just wondering whether its the similar issue is this is a separate issue. And is there any known workarounds ?

asankadarshana007 avatar Sep 15 '22 22:09 asankadarshana007

@asankadarshana007 have you filed aws support case? this is aws-managed env and app. Not sure why you have to set 'hoodie.consistency.check.enabled': 'true'. this is not needed as s3 has strong consistency quite some time back.

xushiyan avatar Sep 16 '22 15:09 xushiyan

Hey @xushiyan - thank you for your reply. At the time when I initially started developing these jobs (a long time back with the previous version of hudi connector - 0.5.x), I wasn't aware of the fact that s3 has strong consistency. I assumed it was eventual consistency. I can change the hudi config value to 'hoodie.consistency.check.enabled': 'false' and test. But do you assume it might have a connection to this error ? I assume with 'hoodie.consistency.check.enabled': 'true', hudi would try to make sure that all files are available - as an extra check ? I have raised a support ticket with AWS. But it could take sometime to get some resolution out of them 😓

asankadarshana007 avatar Sep 19 '22 00:09 asankadarshana007

@asankadarshana007 The consistency check (code below), when enabled, happens when removing invalid data files: (1) check that all paths to delete exist, (2) delete them, (3) wait for all paths to disappear after eventual consistency. Note that step (1) and (3) are not needed for strong consistency. As the invalid data files are now determined based on the markers, there could be a case where a marker is created, but the data file has not started being written, so that the check (1) fails, which is okay. Given that there is no use case for the eventual consistency atm, we don't maintain the logic.

Let me know if turning off hoodie.consistency.check.enabled solves your problem. You can close the ticket if all good.

      if (!invalidDataPaths.isEmpty()) {
        LOG.info("Removing duplicate data files created due to task retries before committing. Paths=" + invalidDataPaths);
        Map<String, List<Pair<String, String>>> invalidPathsByPartition = invalidDataPaths.stream()
            .map(dp -> Pair.of(new Path(basePath, dp).getParent().toString(), new Path(basePath, dp).toString()))
            .collect(Collectors.groupingBy(Pair::getKey));

        // Ensure all files in delete list is actually present. This is mandatory for an eventually consistent FS.
        // Otherwise, we may miss deleting such files. If files are not found even after retries, fail the commit
        if (consistencyCheckEnabled) {
          // This will either ensure all files to be deleted are present.
          waitForAllFiles(context, invalidPathsByPartition, FileVisibility.APPEAR);
        }

        // Now delete partially written files
        context.setJobStatus(this.getClass().getSimpleName(), "Delete all partially written files: " + config.getTableName());
        deleteInvalidFilesByPartitions(context, invalidPathsByPartition);

        // Now ensure the deleted files disappear
        if (consistencyCheckEnabled) {
          // This will either ensure all files to be deleted are absent.
          waitForAllFiles(context, invalidPathsByPartition, FileVisibility.DISAPPEAR);
        }
      }

yihua avatar Sep 21 '22 22:09 yihua

Disabling should resolve the issue. In later Hudi versions, this is disabled by default. See https://github.com/apache/hudi/issues/2983 Please reopen if you still face this issue after disabling consistency check.

codope avatar Sep 30 '22 10:09 codope