[BUG] spark 3.5.3 is not supported. API incompatibility
spark 3.5.1 is supported, but not since 3.5.2
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.catalog.SessionCatalog.invalidateCachedTable(Lorg/apache/spark/sql/catalyst/QualifiedTableName;)V
at org.apache.spark.sql.hudi.command.DropHoodieTableCommand.run(DropHoodieTableCommand.scala:52)
at org.apache.spark.sql.hudi.catalog.HoodieCatalog.dropTable(HoodieCatalog.scala:159)
at org.apache.spark.sql.execution.datasources.v2.DropTableExec.run(DropTableExec.scala:38)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
Hi @melin
Could you please let me know which Hudi version you have used for testing?
Hi @melin
Could you please let me know which Hudi version you have used for testing?
hudi 0.15.0, This pr changes the api: https://issues.apache.org/jira/browse/SPARK-49211
As per the Hudi Git repository, the supported Spark version is 3.5.1, however I am also able to run the Hudi examples using Spark 3.5.2, which is not officially supported.
<spark3.version>3.5.1</spark3.version>
Reference:
- https://github.com/apache/hudi/blob/release-0.15.0-rc3/pom.xml#L141
@melin Thanks for raising this. This may need a fix. Can you please tell us more about what operation you are doing? I see its failing in DropHoodieTableCommand.
Did you tried to run same code with spark version 3.5.1 ?
@melin Thanks for raising this. This may need a fix. Can you please tell us more about what operation you are doing? I see its failing in DropHoodieTableCommand.
Did you tried to run same code with spark version 3.5.1 ?
The drop table sql fails to be executed. The same code is correct when spark 3.5.1 is executed.
Hi @melin
I successfully ran the sample code using Spark 3.5.3, and it worked without any issues when writing and reading the Hudi table data.
It is possible to share the sample reproducible code, so that i can test it.
Try sql: drop table if exists bigdata.hudi_users_kafka
Hi @melin
I have re-tested with Spark 3.5.3 and Hudi 0.15, and I did not encounter any issues. I am sharing a sample pom.xml file and Scala code for your reference. Please test and let me know the results.
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>Spark_Hudi_100</artifactId>
<version>1.0.0</version>
<groupId>com.ranga</groupId>
<properties>
<java.version>8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>3.5.3</spark.version>
<spark.major.version>3.5</spark.major.version>
<scala.version>2.12.18</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<hudi.version>0.15.0</hudi.version>
<maven-shade-plugin.version>3.5.0</maven-shade-plugin.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark${spark.major.version}-bundle_${scala.binary.version}</artifactId>
<version>${hudi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hadoop-mr-bundle</artifactId>
<version>${hudi.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>false</minimizeJar>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<includes>
<!-- Include here the dependencies you want to be packed in your fat jar -->
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>module-info.class</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/*.MF</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Test12099.scala
package com.ranga
import org.apache.spark.SparkConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}
object Test12099 extends App {
val name = this.getClass.getSimpleName.replace("$", "")
val sparkConf = new SparkConf().setAppName(name).setIfMissing("spark.master", "local[2]")
val spark = SparkSession.builder.appName(name).config(sparkConf)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.config("spark.sql.hive.convertMetastoreParquet", "false")
.getOrCreate()
val tableName = name
spark.sql(
f"""
|CREATE TABLE IF NOT EXISTS ${tableName} (
| `id` VARCHAR(20),
| `name` VARCHAR(10),
| `age` INT,
| `ts` Long
|) USING HUDI TBLPROPERTIES (primaryKey = 'id,name', preCombineField = 'ts')
| LOCATION '/tmp/warehouse/t_test'
""".stripMargin)
val input_schema = StructType(Seq(
StructField("id", LongType),
StructField("name", StringType),
StructField("age", IntegerType),
StructField("ts", LongType),
))
val input_data = Seq(
Row(1L, "hello", 42, 1695159649087L),
Row(2L, "world", 13, 1695091554788L),
Row(3L, "spark", 7, 1695115999911L),
Row(1L, "hello", 43, 1695159649087L),
)
val basePath = f"file:///tmp/$tableName"
val hoodieConf = scala.collection.mutable.Map[String, String]()
hoodieConf.put("hoodie.datasource.write.recordkey.field", "id,age")
hoodieConf.put("hoodie.table.precombine.field", "ts")
hoodieConf.put("hoodie.table.name", tableName)
val input_df = spark.createDataFrame(spark.sparkContext.parallelize(input_data), input_schema)
input_df.write.format("hudi").
options(hoodieConf).
mode("append").
save(basePath)
spark.read.format("hudi").load(basePath).show(false)
println("Displaying the tables")
spark.sql("SHOW tables").show(truncate = false)
println("Drop the table")
spark.sql(f"DROP TABLE ${tableName}")
println("Displaying the tables")
spark.sql("SHOW tables").show(truncate = false)
spark.stop()
}
Output:
Displaying the tables
24/10/16 15:52:28 INFO CodeGenerator: Code generated in 60.211 ms
24/10/16 15:52:28 INFO CodeGenerator: Code generated in 31.001125 ms
24/10/16 15:52:28 INFO CodeGenerator: Code generated in 18.569542 ms
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|default |test12099|false |
+---------+---------+-----------+
Drop the table
Displaying the tables
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+
The DropHoodieTableCommand class calls invalidateCachedTable with the QualifiedTableName object passed as the parameter. The FullQualifiedTableName class was added in spark 3.5.3. The invalidateCachedTable method supports only FullQualifiedTableName and TableIdentifier.
The spark version in the hudi project is 3.5.3. idea displays an error calling invalidateCachedTable
Hi @melin
Thanks for sharing detailed analysis. Could you please try my example code and see you can reproduce the issue. If we can reproduce the issue we can raise HUDI jira.
Your code will work. idea breakpoint debugging, DropHoodieTableCommand run method is not entered, I guess the hive catalog is not used, the execution is different.
@melin Thanks for raising this. I confirmed that Drop table is broken with spark 3.5 release.
JIRA created for tracking - https://issues.apache.org/jira/browse/HUDI-8404
We already have a fix on this - https://github.com/apache/hudi/pull/12129/files . Thanks @Zouxxyy
Closing out this, as i confirmed that it is working as epected now. Thanks.