Spark: Add support for Spark 4.1
Created https://github.com/apache/spark/pull/52423 at Spark side to fix failed tests for Spark 4.1.0-preview1.
025-09-22T16:49:13.6462602Z TestCreateActions > testAddColumnOnMigratedTableAtEnd() > catalogName = spark_catalog, implementation = org.apache.iceberg.spark.SparkSessionCatalog, config = {type=hive, default-namespace=default, parquet-enabled=true, cache-enabled=false}, type = hive FAILED
2025-09-22T16:49:13.6466422Z org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 431.0 failed 1 times, most recent failure: Lost task 0.0 in stage 431.0 (TID 3270) (localhost executor driver): java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.types.DataType.transformRecursively(scala.PartialFunction)" because "type" is null
2025-09-22T16:49:13.6469913Z at org.apache.spark.sql.vectorized.ColumnVector.<init>(ColumnVector.java:341)
2025-09-22T16:49:13.6471238Z at org.apache.iceberg.spark.data.vectorized.ConstantColumnVector.<init>(ConstantColumnVector.java:41)
Thanks for working on this, @manuzhang! I have a few changes I want to include on top of your PR. Do you think you can rebase this one (potentially ignoring what currently fails) so that we can get it in and continue working on the changes needed for 4.1?
@aokolnychyi Do you want to get it into main branch or a feature branch? Do we need to discuss on the dev list?
I would get it into the main branch in order to prep for 4.1. You may want to drop a note on the dev list so that folks are aware but I don't think it requires a discuss/vote thread. What do you think, @manuzhang?
@aokolnychyi Rebased and sent out the note to dev list.
I went through the changes and they look good to me.
A few remaining points:
- Remaining failing tests.
- What shall we do with Comet?
- I want to test more of the MERGE behavior changes in Spark to make sure they are correct.
Like here, for instance, it seems suspicious that we now allow strings to be implicitly casted to int.
assertThatThrownBy(
() ->
sql(
"MERGE INTO %s t USING source s "
+ "ON t.id == s.c1 "
+ "WHEN MATCHED THEN "
+ " UPDATE SET t.s.n1 = s.c3",
commitTarget()))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining("Cannot safely cast `s`.`n1` \"STRING\" to \"INT\".");
@szehon-ho, can you check the tests that fail / had to be removed in Iceberg to ensure Spark behavior is correct? Just use the last commit to see actual changes.
@aokolnychyi
This test was removed by mistake and I've restored it.
assertThatThrownBy(
() ->
sql(
"MERGE INTO %s t USING source s "
+ "ON t.id == s.c1 "
+ "WHEN MATCHED THEN "
+ " UPDATE SET t.s.n1 = s.c3",
commitTarget()))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining("Cannot safely cast `s`.`n1` \"STRING\" to \"INT\".");
As for Comet support, I'm using org.apache.datafusion:comet-spark-spark4.0_2.13 for the current as we used org.apache.datafusion:comet-spark-spark3.5_2.13 for 4.0 before it's supported in Comet.
All tests passed now except the following.
TestDataSourceOptions > testExtraSnapshotMetadataWithSQL() > catalogName = testhadoop, implementation = org.apache.iceberg.spark.SparkCatalog, config = {type=hadoop, cache-enabled=false} FAILED
java.lang.AssertionError:
Expected size: 2 but was: 1 in:
It's already fixed in https://github.com/apache/spark/pull/53196 and will be released in next Spark 4.1.0 RC.
it is already in good shape, +1 on getting it into main branch as-is.
will it be part of iceberg 1.10.1 ? ( since spark 4.1.0 was release today ) thanks
will it be part of iceberg 1.10.1 ? ( since spark 4.1.0 was release today ) thanks
No this won't be part of 1.10.1 since 1.10.1 is a patch release
Spark 4.1.0 jars are available in Maven Central, do you guys have a plan to move this forward?
@pan3793 Since Spark 4.1.0 has known issues and Spark 4.1.1 will be released soon, we can wait for 4.1.1 RC and test again.