Use SupportsPrefixOperations for Remove OrphanFile Procedure
Issue Summary
- When testing the following Iceberg integration, I encountered the following failure when running
REMOVE ORPHAN FILES PROCEDURE
java.io.UncheckedIOException: java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied;
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listDirRecursively(DeleteOrphanFilesSparkAction.java:367) ~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listedFileDS(DeleteOrphanFilesSparkAction.java:292) ~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.actualFileIdentDS(DeleteOrphanFilesSparkAction.java:277) ~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.doExecute(DeleteOrphanFilesSparkAction.java:243) ~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
at org.apache.iceberg.spark.actions.BaseSparkAction.withJobGroupInfo(BaseSparkAction.java:127) ~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.execute(DeleteOrphanFilesSparkAction.java:229) ~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
at org.apache.iceberg.spark.procedures.RemoveOrphanFilesProcedure.lambda$call$3(RemoveOrphanFilesProcedure.java:171) ~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
- Currently in
DeleteOrphanFilesSparkAction#listDirRecursively, we do not use the concept ofFileIOand instead we directly create thefileSystemand makelistStatuscalls.
FileSystem fs = path.getFileSystem(conf);
List<String> subDirs = Lists.newArrayList();
for (FileStatus file : fs.listStatus(path, pathFilter)) {
We need to use the s3FileIo otherwise we will hit a 403 s3 access denied error, as the file system does not have correct credentials to access this data. S3FileIO however has the correct credentials when the s3 client is built.
Aim OF PR
- Use s3 File IO within this
REMOVE ORPHAN FILESprocedure code path.
Testing
- With this change, I do not hit 403 and can run this procedure without issue.
spark-sql> CALL my_catalog.system.remove_orphan_files(table => 'iceberg_db.iceberg_table8', dry_run => true);
23/06/21 22:28:56 INFO BaseMetastoreTableOperations: Refreshing table metadata from new version: s3://rchertar-dev/iceberg-nrt/iceberg_table8/metadata/00003-4a0f642b-88a9-41e1-b1ab-5c0bada18b12.metadata.json
23/06/21 22:28:56 INFO CatalogUtil: Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO
23/06/21 22:28:57 INFO BaseMetastoreCatalog: Table loaded by catalog: my_catalog.iceberg_db.iceberg_table8
23/06/21 22:28:57 INFO BaseAllMetadataTableScan: Scanning metadata table my_catalog.iceberg_db.iceberg_table8 with filter true.
23/06/21 22:28:58 INFO BaseAllMetadataTableScan: Scanning metadata table my_catalog.iceberg_db.iceberg_table8 with filter true.
Time taken: 21.439 seconds
spark-sql> select * from my_catalog.iceberg_db.iceberg_table8;
23/06/21 22:29:24 INFO BaseTableScan: Scanning table my_catalog.iceberg_db.iceberg_table8 snapshot 7557581385081927782 created at 2023-06-21T22:26:54.853+00:00 with filter true
23/06/21 22:29:24 INFO LoggingMetricsReporter: Received metrics report: ScanReport{tableName=my_catalog.iceberg_db.iceberg_table8, snapshotId=7557581385081927782, filter=true, schemaId=0, projectedFieldIds=[1, 2, 3], projectedFieldNames=[id, creation_date, last_update_time], scanMetrics=ScanMetricsResult{totalPlanningDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.13239524S, count=1}, resultDataFiles=CounterResult{unit=COUNT, value=3}, resultDeleteFiles=CounterResult{unit=COUNT, value=0}, totalDataManifests=CounterResult{unit=COUNT, value=3}, totalDeleteManifests=CounterResult{unit=COUNT, value=0}, scannedDataManifests=CounterResult{unit=COUNT, value=3}, skippedDataManifests=CounterResult{unit=COUNT, value=0}, totalFileSizeInBytes=CounterResult{unit=BYTES, value=2763}, totalDeleteFileSizeInBytes=CounterResult{unit=BYTES, value=0}, skippedDataFiles=CounterResult{unit=COUNT, value=0}, skippedDeleteFiles=CounterResult{unit=COUNT, value=0}, scannedDeleteManifests=CounterResult{unit=COUNT, value=0}, skippedDeleteManifests=CounterResult{unit=COUNT, value=0}, indexedDeleteFiles=CounterResult{unit=COUNT, value=0}, equalityDeleteFiles=CounterResult{unit=COUNT, value=0}, positionalDeleteFiles=CounterResult{unit=COUNT, value=0}}, metadata={engine-version=3.3.1-amzn-0, app-id=application_1682982958961_0098, engine-name=spark}}
8 jan8 8
9 jan9 9
10 jan10 10
Time taken: 4.427 seconds, Fetched 3 row(s)
spark-sql>
Would appreciate if I can get a review from community.
cc @jackye1995 @aokolnychyi @amogh-jahagirdar @RussellSpitzer
PR still needs tests, let me know when they have been added?
@rahil-c just wanted to check-in and see if you were planning on updating the PR? It would be great to get this in for the 1.4.0 release
Got similar issue in 1.4.2, spark 3.5
My iceberg catalogue in Spark is configured via org.apache.iceberg.aws.s3.S3FileIO filesystem. I store files using s3 prefix; Hovewer, when trying to clean files i get an error org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3".
Ran into a similar issue (same as in https://github.com/apache/iceberg/issues/8368) using the Glue Catalog. Is there maybe a workaround to this, or this PR would be the only fix?
Same issue here. I can't run the remove_orphan_files procedure using Glue and S3 😢
same issue here . let me know if anyone solved this with latest version? @carlosescura @domonkosbalogh-seon @rahil-c
@lokeshrdy Still doesn't work using Spark 3.5.0and Iceberg 1.5.0 and Glue as catalog with the following config:
SPARK_SETTINGS = [
(
"spark.jars",
"""
/opt/spark/jars/iceberg-aws-bundle-1.5.0.jar,
/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.0.jar,
/opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar,
/opt/spark/jars/hadoop-aws-3.3.4.jar
""",
),
("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"),
("spark.hadoop.com.amazonaws.services.s3.enableV4", "true"),
(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
),
(
"spark.sql.catalog.main_catalog",
"org.apache.iceberg.spark.SparkCatalog",
),
(
"spark.sql.catalog.main_catalog.catalog-impl",
"org.apache.iceberg.aws.glue.GlueCatalog",
),
(
"spark.sql.catalog.main_catalog.io-impl",
"org.apache.iceberg.aws.s3.S3FileIO",
),
(
"spark.sql.catalog.main_catalog.warehouse",
ICEBERG_CATALOG_WHAREHOUSE,
),
]
I had to add hadoop-aws-3.3.4.jar to be able to download some CSVs and load them as Spark DF.
When calling the remove_orphan_files procedure I get the following exception:
py4j.protocol.Py4JJavaError: An error occurred while calling o46.sql.
: java.io.UncheckedIOException: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listDirRecursively(DeleteOrphanFilesSparkAction.java:386)
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listedFileDS(DeleteOrphanFilesSparkAction.java:311)
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.actualFileIdentDS(DeleteOrphanFilesSparkAction.java:296)
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.doExecute(DeleteOrphanFilesSparkAction.java:247)
at org.apache.iceberg.spark.JobGroupUtils.withJobGroupInfo(JobGroupUtils.java:59)
at org.apache.iceberg.spark.JobGroupUtils.withJobGroupInfo(JobGroupUtils.java:51)
at org.apache.iceberg.spark.actions.BaseSparkAction.withJobGroupInfo(BaseSparkAction.java:130)
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.execute(DeleteOrphanFilesSparkAction.java:223)
at org.apache.iceberg.spark.procedures.RemoveOrphanFilesProcedure.lambda$call$3(RemoveOrphanFilesProcedure.java:185)
at org.apache.iceberg.spark.procedures.BaseProcedure.execute(BaseProcedure.java:107)
at org.apache.iceberg.spark.procedures.BaseProcedure.withIcebergTable(BaseProcedure.java:96)
at org.apache.iceberg.spark.procedures.RemoveOrphanFilesProcedure.call(RemoveOrphanFilesProcedure.java:139)
at org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:34)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listDirRecursively(DeleteOrphanFilesSparkAction.java:356)
... 55 more
@carlosescura the issue itself hasn't be solved yet. I'm not sure if @rahil-c is actively working on this issue. If not, maybe someone else from the community is interested in working on this.
@rahil-c is there any possibility to continue working on this PR? Many of us would really appreciate it.
Hi , I am also facing the same issue while running orphan file clean up via Nessie REST. Auto-compaction and snapshot expiry works, but orphan file clean up procecure gives the same error. Is there any ETA on this fix?
java.io.UncheckedIOException: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
Hi all sorry for the delay on this issue, been engaged in many internal things at work so did not get time to revisit this.
Originally when I encountered this issue it was a very specific feature I was working on with AWS LakeFormation and Iceberg integration hence I opened this PR, to solve that issue. It seems there are several people however that have been hitting issues around Remove OrphanFile Procedure but unsure as to if its exactly the same issue that I mentioned in the overview.
In terms of the following issue No FileSystem for scheme "s3"., my understanding is the remove orphan file procedure is invoking the hadoop file system directly, and if a user is trying to read a s3 path, hadoop does not understand naturally what this file scheme is. https://github.com/apache/iceberg/blob/main/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java#L356
The mitigation would be to likely leverage hadoop-aws jar and configure spark with the appropriate hadoop aws configurations. In the iceberg aws docs: https://github.com/apache/iceberg/blob/main/docs/docs/aws.md#hadoop-s3a-filesystem
Add [hadoop-aws](https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws) as a runtime dependency of your compute engine.
Configure AWS settings based on [hadoop-aws documentation](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html) (make sure you check the version, S3A configuration varies a lot based on the version you use).
I think in users spark configurations they can try adding
"spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem", as I saw a similar thread here: https://apache-iceberg.slack.com/archives/C03LG1D563F/p1656918500567629
As for landing this PR will see if I can add tests based on @RussellSpitzer feedback.
@RussellSpitzer @amogh-jahagirdar Wanted to understand what is the actual test needed for this change? I saw this comment
We also need a test which exercises this code path, (Does HadoopFS do this by default? If So do we have a test for the other path:noprefix)
However based on the diff of the pr the actual logic change is on the list with prefix path.
When checking TestRemoveOrphanFilesAction which uses DeleteOrphanFilesSparkAction my assumption is it would test list prefix as this test is using HadoopTables which use the HadoopFileIO which leverages the SupportPrefixOperations interface.
@Override
public Iterable<FileInfo> listPrefix(String prefix) {
Path prefixToList = new Path(prefix);
FileSystem fs = Util.getFs(prefixToList, hadoopConf.get());
return () -> {
try {
return Streams.stream(
new AdaptingIterator<>(fs.listFiles(prefixToList, true /* recursive */)))
.map(
fileStatus ->
new FileInfo(
fileStatus.getPath().toString(),
fileStatus.getLen(),
fileStatus.getModificationTime()))
.iterator();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
};
}
The basic issue is we want to make sure we have a test which uses both the supportsPrefix enabled FS and one where it is not enabled so we are sure that both implementations remain correct.
The basic issue is we want to make sure we have a test which uses both the supportsPrefix enabled FS and one where it is not enabled to we are sure that both implementations remain correct.
I see thank you for the clarification @RussellSpitzer @amogh-jahagirdar
HadoopFileIO (and therefore the local fs) supports listPrefix. It'll need a CustomFileIO as with similar tests
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.
CALL nessie.system.remove_orphan_files(table => 'nessie.robot_dev.robot_data')
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
@RussellSpitzer
Confirm the issue is still there. After manually set the spark.hadoop.fs.s3.impl to S3A. If the client has S3 credential with needed access, it will work. However, if through credential vending from Polars, it can fail (in this case, client doesn't have S3 credential). Here is the reference ticket in Polaris side: https://github.com/apache/polaris/issues/390
@rahil-c please check if its true below: listWithPrefix maybe list too much unexpected files, such as 2 tables: sample , sample_part matchingFiles will contain:
...
DeleteOrphanFilesSparkAction: match files: s3a://ice-lake/warehouse/ice_db/sample/metadata/snap-4835616794401450947-1-f5bb6d24-162f-4c9d-a426-893c07cac506.avro
DeleteOrphanFilesSparkAction: match files: s3a://ice-lake/warehouse/ice_db/sample_part/data/ds=20240806/00000-3-0c28a14d-186c-489d-aeb8-f0a949a67297-0-00002.parquet
...
so sample_part table will lost its metadata files
Has anyone got a nice workaround for how to remove orphan files for an S3-located iceberg table? Conscious this is PR is 18months old and I'm assuming someone has got this to work on their end... somehow?
S3a fs implemente bulk delete too...maybe this and S3FileIO can do the right thing (*)
(*) we added it to all filesystems, but the page size of the others is zero
@steveloughran This isn't about bulk deletes (which S3FileIO does support). The issue is how to properly scale the identification of orphaned files, which is function of the procedure, not the file system.