iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Spark Iceberg streaming - checkpoint leverages S3fileIo signer path instead of hadoop's S3AFileSystem

Open koombal opened this issue 3 weeks ago • 17 comments

Apache Iceberg version

1.8.1

Query engine

Spark

Please describe the bug 🐞

Spark 3.5.6 Iceberg 1.8.1

we are building a Medalion like architecture in iceberg. streaming from iceberg bronze table to iceberg silver table.

this is how we write to the destination table:

        return bronzeStream
                .writeStream()
                .outputMode("append")
                .option("checkpointLocation", ckptDir)
                .queryName("bronze_to_silver_" + silverTableName)
                .trigger(Trigger.ProcessingTime(processingTime))
                .foreachBatch((batch, batchId) -> {
                    if (batch.isEmpty()) {
                        log.info(
                                "Bronze->Silver stream {}: batch {} is empty, skipping write.",
                                silverTableName,
                                batchId);
                        return;
                    }
                    batch.writeTo(StreamUtils.getFullTableName(silverTableName)).append();
                })
                .start();
    }

we are getting the following error in spark: Query 20b3208b-35a2-4feb-bce8-ac76abe3df9c terminated with error: org.apache.iceberg.exceptions.ForbiddenException: Forbidden: Table not found or action can_get_metadata forbidden for Anonymous at org.apache.iceberg.rest.ErrorHandlers$DefaultErrorHandler.accept(ErrorHandlers.java:212) at org.apache.iceberg.rest.ErrorHandlers$DefaultErrorHandler.accept(ErrorHandlers.java:188) at org.apache.iceberg.rest.HTTPClient.throwFailure(HTTPClient.java:224) at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:308) at org.apache.iceberg.rest.BaseHTTPClient.post(BaseHTTPClient.java:100) at **org.apache.iceberg.aws.s3.signer.S3V4RestSignerClient.sign(**S3V4RestSignerClient.java:351) at software.amazon.awssdk.core.internal.http.pipeline.stages.SigningStage.lambda$signRequest$4(SigningStage.java:154) at software.amazon.awssdk.core.internal.util.MetricUtils.measureDuration(MetricUtils.java:63) at software.amazon.awssdk.core.internal.http.pipeline.stages.SigningStage.signRequest(SigningStage.java:153) at software.amazon.awssdk.core.internal.http.pipeline.stages.SigningStage.execute(SigningStage.java:72) at software.amazon.awssdk.core.internal.http.pipeline.stages.SigningStage.execute(SigningStage.java:50) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)

in the catalog we can see that a sign request was sent on behalf of the checkpoint directory.... we would expect that: The checkpoint directory will use its hadoop credentials and not go through S3FileIo with a "S3V4RestSignerClient.sign" flow (or an STS flow for that matter).

Willingness to contribute

  • [ ] I can contribute a fix for this bug independently
  • [ ] I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • [ ] I cannot contribute a fix for this bug at this time

koombal avatar Dec 04 '25 15:12 koombal

@koombal did you enable s3.remote-signing-enabled by any chance? Against which REST catalog implementation is this running? It's also possible that the server set the s3.remote-signing-enabled flag by sending it back in the initial /config response to the REST client

nastra avatar Dec 04 '25 15:12 nastra

well, i tried explicitly to set it to false on the client side (Spark) but to the best of my understanding it is returned and determined by the REST catalog ("Lakekeeper" in this instance). As i understand i can work with STS or with the remote signing. but for some reason, the checkpoint is being sent to the catalog for a sign request as well.

By the way, our other spark streaming, which reads from Kafka and writes to Iceberg - has no such issue

koombal avatar Dec 04 '25 15:12 koombal

it's important to note that the same app is running on Azure with: ADLSFileIO without any issues. The issue is only occurring in AWS with S3FileIo

koombal avatar Dec 07 '25 06:12 koombal

@koombal remote signing exists only for S3. For azure there are vended credentials. For S3 you might want to use vended creds as well (https://docs.lakekeeper.io/docs/0.10.x/storage/?h=remote#s3)

nastra avatar Dec 08 '25 07:12 nastra

@nastra it's the same error for vended credentials in S3. no issue in Azure. could be related to S3FileIo?

koombal avatar Dec 08 '25 10:12 koombal

@c-thiel is this something you could please take a look at?

nastra avatar Dec 08 '25 10:12 nastra

The strange thing is that Spark seems to use the IO for the table also for the Checkpoint.

In Lakekeeper we see a very clear log that states:

Received a table specific sign request for table xxx with a location s3://xxx/warehouse/019ae372-7eed-75c3-bca2-b35bcd38def0 that does not match the request URI https://s3.us-west-2.amazonaws.com/xxx/dlh2/spark-checkpoints/iceberg-streaming/bronze_to_silver/my_table/sources/0/offsets/0

According to @koombal the same happens with STS - Spark tries to use the vended credentials returned from a LoadTableResponse to use for the Checkpoint IO as well - which is again forbidden because the Checkpoint is not in the tables location.

@nastra do you know how a Table specific FileIO could leak out and be used to store a checkpoint? I tried browsing the Java code but nothing caught my eye.

c-thiel avatar Dec 09 '25 09:12 c-thiel

i will just fix my comment before - in Azure we didn't try doing catalog authentication, we were doing against azure blob storage

koombal avatar Dec 09 '25 09:12 koombal

@c-thiel the Table is broadcasted in https://github.com/apache/iceberg/blob/24ca356fb4ecd48d593949fd25c852c21bc87d53/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java#L105 and the Table's IO instance is set in https://github.com/apache/iceberg/blob/e667f64f5bddbacb1a641ac8ea67fc21a76e434d/core/src/main/java/org/apache/iceberg/SerializableTable.java#L86 That IO is then used in https://github.com/apache/iceberg/blob/24ca356fb4ecd48d593949fd25c852c21bc87d53/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java#L558-L569

nastra avatar Dec 09 '25 10:12 nastra

@nastra thanks for the links! Would you say this desired behavior that the spark checkpoint uses the table's FileIO? I feel like the checkpoint io should be independant of the table io. I am unsure how a solution should look like.

c-thiel avatar Dec 09 '25 10:12 c-thiel

Yes I think it's correct that the checkpoint uses the Table's FileIO, otherwise I don't see what other options we have and what other FileIO it would use. It sounds like it's possibly a permission issue on the Lakekeeper side?

nastra avatar Dec 09 '25 10:12 nastra

Lakekeeper does not give credentials to this user specified location outside of the table scope, but neither does any other Catalog implementation that I am aware of. I guess the question is should handing out credentials to a specific query engines checkpoint location be the responsibility of the server - I don't think it should.

Testing with a spark python session I also don't have this problem. Instead I get the error:

org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"

If I configure s3a via hadoop in the spark session globally, it works - here is a small example: https://github.com/lakekeeper/lakekeeper/pull/1537

@nastra any idea why scala uses the tables FileIO while with python doesn't? @koombal can you provide any more information about your environment?

c-thiel avatar Dec 09 '25 17:12 c-thiel

@c-thiel @nastra we are running

  1. spark 3.5_2.12 (java).
  2. Lakekeeper catalog
  3. K8S
  4. configuring hadoop with s3a and creating the checkpoints successfully using hadoop
  5. Iceberg 1.8.1 --> now migrating to 1.10

koombal avatar Dec 09 '25 20:12 koombal

@koombal can you please share your entire catalog configuration and also what does ckptDir point to in .option("checkpointLocation", ckptDir)?

@nastra any idea why scala uses the tables FileIO while with python doesn't?

I don't know, I've been only testing this through our TestStructuredStreamingRead tests and all of those use a temp location for checkpoints. I applied this diff for testing to modify the FileIO to be used

+import org.apache.iceberg.inmemory.InMemoryFileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.junit.jupiter.api.extension.ExtendWith;

@@ -52,6 +53,7 @@ public abstract class CatalogTestBase extends TestBaseWithCatalog {
         ImmutableMap.builder()
             .putAll(SparkCatalogConfig.REST.properties())
             .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI))
+            .put(CatalogProperties.FILE_IO_IMPL, InMemoryFileIO.class.getName())
             .build()
       }
     };

and the checkpoint is definitely using the table's FileIO because the tests would then fail with

No in-memory file found for location: file:/var/folders/q3/m55sggg13cz2dpssg7l_q9300000gp/T/junit-395966452732806867/writer-checkpoint-folder/writer-checkpoint/sources/0/offsets/0
org.apache.iceberg.exceptions.NotFoundException: No in-memory file found for location: file:/var/folders/q3/m55sggg13cz2dpssg7l_q9300000gp/T/junit-395966452732806867/writer-checkpoint-folder/writer-checkpoint/sources/0/offsets/0
	at app//org.apache.iceberg.inmemory.InMemoryFileIO.newInputFile(InMemoryFileIO.java:48)
	at app//org.apache.iceberg.spark.source.SparkMicroBatchStream$InitialOffsetStore.initialOffset(SparkMicroBatchStream.java:558)
	at app//org.apache.iceberg.spark.source.SparkMicroBatchStream.<init>(SparkMicroBatchStream.java:116)
	at app//org.apache.iceberg.spark.source.SparkScan.toMicroBatchStream(SparkScan.java:170)

nastra avatar Dec 10 '25 08:12 nastra

@nastra

builder.config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.iceberg.type", "rest")
        .config("spark.sql.catalog.iceberg.uri", icebergUri)
        .config("spark.sql.catalog.iceberg.warehouse", icebergWarehouseName);
builder.config("spark.sql.adaptive.enabled", "true").config("spark.sql.adaptive.skewJoin.enabled", "true");

builder.config("spark.sql.timestampType", "TIMESTAMP_NTZ");

builder.config("spark.sql.catalog.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
        .config(
                "spark.sql.catalog.iceberg.client.region",
                s3ConfigurationProperties
                        .getGeneralConfigurationProperties()
                        .getRegion())
        .config("spark.sql.catalog.iceberg.s3.endpoint", s3ConfigurationProperties.getSparkUrl())
        .config("spark.sql.catalog.iceberg.s3.path-style-access", "true");




        builder.config("spark.hadoop.fs.s3a.access.key", s3ConfigurationProperties.getAccessKey());
        builder.config("spark.hadoop.fs.s3a.secret.key", s3ConfigurationProperties.getSecretKey());
    builder.config("spark.hadoop.fs.s3a.endpoint", s3ConfigurationProperties.getSparkUrl());
    builder.config("spark.hadoop.fs.s3a.path.style.access", "true");
    builder.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");

we tried moving the checkpoint to different paths and i believe also to a different bucket... but experienced the same error

koombal avatar Dec 10 '25 13:12 koombal

@koombal does your checkpoint location use s3:// or s3a://?

nastra avatar Dec 10 '25 13:12 nastra

@nastra s3a. the checkpoint is created successfully before.

`
log.info("Ensuring checkpoint directory {}", ckptDir);

        FileSystem fs =
                FileSystem.get(new URI(ckptDir), spark.sparkContext().hadoopConfiguration());

        Path dir = new Path(ckptDir);
        if (!fs.exists(dir)) {
            log.info("Checkpoint directory '{}' does not exist – creating it now", ckptDir);
            if (!fs.mkdirs(dir)) {
               ....
            }
        }

`

koombal avatar Dec 10 '25 13:12 koombal