Spark Iceberg streaming - checkpoint leverages S3fileIo signer path instead of hadoop's S3AFileSystem
Apache Iceberg version
1.8.1
Query engine
Spark
Please describe the bug 🐞
Spark 3.5.6 Iceberg 1.8.1
we are building a Medalion like architecture in iceberg. streaming from iceberg bronze table to iceberg silver table.
this is how we write to the destination table:
return bronzeStream
.writeStream()
.outputMode("append")
.option("checkpointLocation", ckptDir)
.queryName("bronze_to_silver_" + silverTableName)
.trigger(Trigger.ProcessingTime(processingTime))
.foreachBatch((batch, batchId) -> {
if (batch.isEmpty()) {
log.info(
"Bronze->Silver stream {}: batch {} is empty, skipping write.",
silverTableName,
batchId);
return;
}
batch.writeTo(StreamUtils.getFullTableName(silverTableName)).append();
})
.start();
}
we are getting the following error in spark: Query 20b3208b-35a2-4feb-bce8-ac76abe3df9c terminated with error: org.apache.iceberg.exceptions.ForbiddenException: Forbidden: Table not found or action can_get_metadata forbidden for Anonymous at org.apache.iceberg.rest.ErrorHandlers$DefaultErrorHandler.accept(ErrorHandlers.java:212) at org.apache.iceberg.rest.ErrorHandlers$DefaultErrorHandler.accept(ErrorHandlers.java:188) at org.apache.iceberg.rest.HTTPClient.throwFailure(HTTPClient.java:224) at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:308) at org.apache.iceberg.rest.BaseHTTPClient.post(BaseHTTPClient.java:100) at **org.apache.iceberg.aws.s3.signer.S3V4RestSignerClient.sign(**S3V4RestSignerClient.java:351) at software.amazon.awssdk.core.internal.http.pipeline.stages.SigningStage.lambda$signRequest$4(SigningStage.java:154) at software.amazon.awssdk.core.internal.util.MetricUtils.measureDuration(MetricUtils.java:63) at software.amazon.awssdk.core.internal.http.pipeline.stages.SigningStage.signRequest(SigningStage.java:153) at software.amazon.awssdk.core.internal.http.pipeline.stages.SigningStage.execute(SigningStage.java:72) at software.amazon.awssdk.core.internal.http.pipeline.stages.SigningStage.execute(SigningStage.java:50) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
in the catalog we can see that a sign request was sent on behalf of the checkpoint directory.... we would expect that: The checkpoint directory will use its hadoop credentials and not go through S3FileIo with a "S3V4RestSignerClient.sign" flow (or an STS flow for that matter).
Willingness to contribute
- [ ] I can contribute a fix for this bug independently
- [ ] I would be willing to contribute a fix for this bug with guidance from the Iceberg community
- [ ] I cannot contribute a fix for this bug at this time
@koombal did you enable s3.remote-signing-enabled by any chance? Against which REST catalog implementation is this running? It's also possible that the server set the s3.remote-signing-enabled flag by sending it back in the initial /config response to the REST client
well, i tried explicitly to set it to false on the client side (Spark) but to the best of my understanding it is returned and determined by the REST catalog ("Lakekeeper" in this instance). As i understand i can work with STS or with the remote signing. but for some reason, the checkpoint is being sent to the catalog for a sign request as well.
By the way, our other spark streaming, which reads from Kafka and writes to Iceberg - has no such issue
it's important to note that the same app is running on Azure with:
ADLSFileIO without any issues.
The issue is only occurring in AWS with S3FileIo
@koombal remote signing exists only for S3. For azure there are vended credentials. For S3 you might want to use vended creds as well (https://docs.lakekeeper.io/docs/0.10.x/storage/?h=remote#s3)
@nastra it's the same error for vended credentials in S3. no issue in Azure. could be related to S3FileIo?
@c-thiel is this something you could please take a look at?
The strange thing is that Spark seems to use the IO for the table also for the Checkpoint.
In Lakekeeper we see a very clear log that states:
Received a table specific sign request for table xxx with a location s3://xxx/warehouse/019ae372-7eed-75c3-bca2-b35bcd38def0 that does not match the request URI https://s3.us-west-2.amazonaws.com/xxx/dlh2/spark-checkpoints/iceberg-streaming/bronze_to_silver/my_table/sources/0/offsets/0
According to @koombal the same happens with STS - Spark tries to use the vended credentials returned from a LoadTableResponse to use for the Checkpoint IO as well - which is again forbidden because the Checkpoint is not in the tables location.
@nastra do you know how a Table specific FileIO could leak out and be used to store a checkpoint? I tried browsing the Java code but nothing caught my eye.
i will just fix my comment before - in Azure we didn't try doing catalog authentication, we were doing against azure blob storage
@c-thiel the Table is broadcasted in https://github.com/apache/iceberg/blob/24ca356fb4ecd48d593949fd25c852c21bc87d53/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java#L105 and the Table's IO instance is set in https://github.com/apache/iceberg/blob/e667f64f5bddbacb1a641ac8ea67fc21a76e434d/core/src/main/java/org/apache/iceberg/SerializableTable.java#L86 That IO is then used in https://github.com/apache/iceberg/blob/24ca356fb4ecd48d593949fd25c852c21bc87d53/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java#L558-L569
@nastra thanks for the links! Would you say this desired behavior that the spark checkpoint uses the table's FileIO? I feel like the checkpoint io should be independant of the table io. I am unsure how a solution should look like.
Yes I think it's correct that the checkpoint uses the Table's FileIO, otherwise I don't see what other options we have and what other FileIO it would use. It sounds like it's possibly a permission issue on the Lakekeeper side?
Lakekeeper does not give credentials to this user specified location outside of the table scope, but neither does any other Catalog implementation that I am aware of. I guess the question is should handing out credentials to a specific query engines checkpoint location be the responsibility of the server - I don't think it should.
Testing with a spark python session I also don't have this problem. Instead I get the error:
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
If I configure s3a via hadoop in the spark session globally, it works - here is a small example: https://github.com/lakekeeper/lakekeeper/pull/1537
@nastra any idea why scala uses the tables FileIO while with python doesn't? @koombal can you provide any more information about your environment?
@c-thiel @nastra we are running
- spark 3.5_2.12 (java).
- Lakekeeper catalog
- K8S
- configuring hadoop with s3a and creating the checkpoints successfully using hadoop
- Iceberg 1.8.1 --> now migrating to 1.10
@koombal can you please share your entire catalog configuration and also what does ckptDir point to in .option("checkpointLocation", ckptDir)?
@nastra any idea why scala uses the tables FileIO while with python doesn't?
I don't know, I've been only testing this through our TestStructuredStreamingRead tests and all of those use a temp location for checkpoints. I applied this diff for testing to modify the FileIO to be used
+import org.apache.iceberg.inmemory.InMemoryFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -52,6 +53,7 @@ public abstract class CatalogTestBase extends TestBaseWithCatalog {
ImmutableMap.builder()
.putAll(SparkCatalogConfig.REST.properties())
.put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI))
+ .put(CatalogProperties.FILE_IO_IMPL, InMemoryFileIO.class.getName())
.build()
}
};
and the checkpoint is definitely using the table's FileIO because the tests would then fail with
No in-memory file found for location: file:/var/folders/q3/m55sggg13cz2dpssg7l_q9300000gp/T/junit-395966452732806867/writer-checkpoint-folder/writer-checkpoint/sources/0/offsets/0
org.apache.iceberg.exceptions.NotFoundException: No in-memory file found for location: file:/var/folders/q3/m55sggg13cz2dpssg7l_q9300000gp/T/junit-395966452732806867/writer-checkpoint-folder/writer-checkpoint/sources/0/offsets/0
at app//org.apache.iceberg.inmemory.InMemoryFileIO.newInputFile(InMemoryFileIO.java:48)
at app//org.apache.iceberg.spark.source.SparkMicroBatchStream$InitialOffsetStore.initialOffset(SparkMicroBatchStream.java:558)
at app//org.apache.iceberg.spark.source.SparkMicroBatchStream.<init>(SparkMicroBatchStream.java:116)
at app//org.apache.iceberg.spark.source.SparkScan.toMicroBatchStream(SparkScan.java:170)
@nastra
builder.config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.iceberg.type", "rest")
.config("spark.sql.catalog.iceberg.uri", icebergUri)
.config("spark.sql.catalog.iceberg.warehouse", icebergWarehouseName);
builder.config("spark.sql.adaptive.enabled", "true").config("spark.sql.adaptive.skewJoin.enabled", "true");
builder.config("spark.sql.timestampType", "TIMESTAMP_NTZ");
builder.config("spark.sql.catalog.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.config(
"spark.sql.catalog.iceberg.client.region",
s3ConfigurationProperties
.getGeneralConfigurationProperties()
.getRegion())
.config("spark.sql.catalog.iceberg.s3.endpoint", s3ConfigurationProperties.getSparkUrl())
.config("spark.sql.catalog.iceberg.s3.path-style-access", "true");
builder.config("spark.hadoop.fs.s3a.access.key", s3ConfigurationProperties.getAccessKey());
builder.config("spark.hadoop.fs.s3a.secret.key", s3ConfigurationProperties.getSecretKey());
builder.config("spark.hadoop.fs.s3a.endpoint", s3ConfigurationProperties.getSparkUrl());
builder.config("spark.hadoop.fs.s3a.path.style.access", "true");
builder.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
we tried moving the checkpoint to different paths and i believe also to a different bucket... but experienced the same error
@koombal does your checkpoint location use s3:// or s3a://?
@nastra s3a. the checkpoint is created successfully before.
`
log.info("Ensuring checkpoint directory {}", ckptDir);
FileSystem fs =
FileSystem.get(new URI(ckptDir), spark.sparkContext().hadoopConfiguration());
Path dir = new Path(ckptDir);
if (!fs.exists(dir)) {
log.info("Checkpoint directory '{}' does not exist – creating it now", ckptDir);
if (!fs.mkdirs(dir)) {
....
}
}
`