hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[BUG] spark 3.5.3 is not supported. API incompatibility

Open melin opened this issue 1 year ago • 11 comments

spark 3.5.1 is supported, but not since 3.5.2

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.catalog.SessionCatalog.invalidateCachedTable(Lorg/apache/spark/sql/catalyst/QualifiedTableName;)V
	at org.apache.spark.sql.hudi.command.DropHoodieTableCommand.run(DropHoodieTableCommand.scala:52)
	at org.apache.spark.sql.hudi.catalog.HoodieCatalog.dropTable(HoodieCatalog.scala:159)
	at org.apache.spark.sql.execution.datasources.v2.DropTableExec.run(DropTableExec.scala:38)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)

melin avatar Oct 15 '24 02:10 melin

Hi @melin

Could you please let me know which Hudi version you have used for testing?

rangareddy avatar Oct 15 '24 05:10 rangareddy

Hi @melin

Could you please let me know which Hudi version you have used for testing?

hudi 0.15.0, This pr changes the api: https://issues.apache.org/jira/browse/SPARK-49211

melin avatar Oct 15 '24 09:10 melin

As per the Hudi Git repository, the supported Spark version is 3.5.1, however I am also able to run the Hudi examples using Spark 3.5.2, which is not officially supported.

<spark3.version>3.5.1</spark3.version>

spark_3_5_2_hudi_0_15.log

Reference:

  • https://github.com/apache/hudi/blob/release-0.15.0-rc3/pom.xml#L141

rangareddy avatar Oct 15 '24 12:10 rangareddy

@melin Thanks for raising this. This may need a fix. Can you please tell us more about what operation you are doing? I see its failing in DropHoodieTableCommand.

Did you tried to run same code with spark version 3.5.1 ?

ad1happy2go avatar Oct 15 '24 12:10 ad1happy2go

@melin Thanks for raising this. This may need a fix. Can you please tell us more about what operation you are doing? I see its failing in DropHoodieTableCommand.

Did you tried to run same code with spark version 3.5.1 ?

The drop table sql fails to be executed. The same code is correct when spark 3.5.1 is executed.

melin avatar Oct 15 '24 13:10 melin

Hi @melin

I successfully ran the sample code using Spark 3.5.3, and it worked without any issues when writing and reading the Hudi table data.

It is possible to share the sample reproducible code, so that i can test it.

spark_3_5_3_hudi_0_15.log

rangareddy avatar Oct 15 '24 15:10 rangareddy

Try sql: drop table if exists bigdata.hudi_users_kafka image

melin avatar Oct 16 '24 00:10 melin

Hi @melin

I have re-tested with Spark 3.5.3 and Hudi 0.15, and I did not encounter any issues. I am sharing a sample pom.xml file and Scala code for your reference. Please test and let me know the results.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <artifactId>Spark_Hudi_100</artifactId>
    <version>1.0.0</version>
    <groupId>com.ranga</groupId>

    <properties>
        <java.version>8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spark.version>3.5.3</spark.version>
        <spark.major.version>3.5</spark.major.version>
        <scala.version>2.12.18</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hudi.version>0.15.0</hudi.version>
        <maven-shade-plugin.version>3.5.0</maven-shade-plugin.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-spark${spark.major.version}-bundle_${scala.binary.version}</artifactId>
            <version>${hudi.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-hadoop-mr-bundle</artifactId>
            <version>${hudi.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>${maven-shade-plugin.version}</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>false</minimizeJar>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <artifactSet>
                                <includes>
                                    <!-- Include here the dependencies you want to be packed in your fat jar -->
                                    <include>*:*</include>
                                </includes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>module-info.class</exclude>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                        <exclude>META-INF/*.MF</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

Test12099.scala

package com.ranga

import org.apache.spark.SparkConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}

object Test12099 extends App {
  val name = this.getClass.getSimpleName.replace("$", "")
  val sparkConf = new SparkConf().setAppName(name).setIfMissing("spark.master", "local[2]")

  val spark = SparkSession.builder.appName(name).config(sparkConf)
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
    .config("spark.sql.hive.convertMetastoreParquet", "false")
    .getOrCreate()

  val tableName = name

  spark.sql(
    f"""
      |CREATE TABLE IF NOT EXISTS ${tableName} (
      |  `id` VARCHAR(20),
      |  `name` VARCHAR(10),
      |  `age` INT,
      |  `ts` Long
      |) USING HUDI TBLPROPERTIES (primaryKey = 'id,name', preCombineField = 'ts')
      | LOCATION '/tmp/warehouse/t_test'
    """.stripMargin)

  val input_schema = StructType(Seq(
    StructField("id", LongType),
    StructField("name", StringType),
    StructField("age", IntegerType),
    StructField("ts", LongType),
  ))

  val input_data = Seq(
    Row(1L, "hello", 42,  1695159649087L),
    Row(2L, "world", 13, 1695091554788L),
    Row(3L, "spark", 7, 1695115999911L),
    Row(1L, "hello", 43,  1695159649087L),
  )

  val basePath = f"file:///tmp/$tableName"
  val hoodieConf = scala.collection.mutable.Map[String, String]()
  hoodieConf.put("hoodie.datasource.write.recordkey.field", "id,age")
  hoodieConf.put("hoodie.table.precombine.field", "ts")
  hoodieConf.put("hoodie.table.name", tableName)

  val input_df = spark.createDataFrame(spark.sparkContext.parallelize(input_data), input_schema)
  input_df.write.format("hudi").
    options(hoodieConf).
    mode("append").
    save(basePath)

  spark.read.format("hudi").load(basePath).show(false)
  println("Displaying the tables")
  spark.sql("SHOW tables").show(truncate = false)
  println("Drop the table")
  spark.sql(f"DROP TABLE ${tableName}")
  println("Displaying the tables")
  spark.sql("SHOW tables").show(truncate = false)
  spark.stop()
}

Output:

Displaying the tables
24/10/16 15:52:28 INFO CodeGenerator: Code generated in 60.211 ms
24/10/16 15:52:28 INFO CodeGenerator: Code generated in 31.001125 ms
24/10/16 15:52:28 INFO CodeGenerator: Code generated in 18.569542 ms
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|default  |test12099|false      |
+---------+---------+-----------+

Drop the table
Displaying the tables
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+

rangareddy avatar Oct 16 '24 10:10 rangareddy

The DropHoodieTableCommand class calls invalidateCachedTable with the QualifiedTableName object passed as the parameter. The FullQualifiedTableName class was added in spark 3.5.3. The invalidateCachedTable method supports only FullQualifiedTableName and TableIdentifier.

The spark version in the hudi project is 3.5.3. idea displays an error calling invalidateCachedTable image

melin avatar Oct 16 '24 12:10 melin

Hi @melin

Thanks for sharing detailed analysis. Could you please try my example code and see you can reproduce the issue. If we can reproduce the issue we can raise HUDI jira.

rangareddy avatar Oct 16 '24 13:10 rangareddy

Your code will work. idea breakpoint debugging, DropHoodieTableCommand run method is not entered, I guess the hive catalog is not used, the execution is different. image

melin avatar Oct 17 '24 03:10 melin

@melin Thanks for raising this. I confirmed that Drop table is broken with spark 3.5 release.

JIRA created for tracking - https://issues.apache.org/jira/browse/HUDI-8404

ad1happy2go avatar Oct 22 '24 05:10 ad1happy2go

We already have a fix on this - https://github.com/apache/hudi/pull/12129/files . Thanks @Zouxxyy

ad1happy2go avatar Oct 23 '24 04:10 ad1happy2go

Closing out this, as i confirmed that it is working as epected now. Thanks.

ad1happy2go avatar Oct 23 '24 07:10 ad1happy2go