delta icon indicating copy to clipboard operation
delta copied to clipboard

DELTA source resolution is brittle when serving Spark SQL through Hive Thrift server

Open chajath opened this issue 1 year ago • 2 comments

Bug

Describe the problem

We run sqls to create TABLEs etc with ... USING DELTA directive. It works ok for the first time, but in the subsequent run, or if there was an error in the previous run, DELTA name fails to get resolved. If I fully spell out ... USING org.apache.spark.sql.delta.sources.DeltaDataSource, then the statement still works.

Steps to reproduce

I run Spark SQL thrift server programmatically from PySpark:

# with delta-spark==2.0.0 and pyspark==3.2.2

from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkConf()
# Install JVM packages.
sc.set("spark.jars.packages", ",".join([
    "org.apache.hadoop:hadoop-aws:3.2.2",
    "io.delta:delta-core_2.12:2.0.0",
]))
# Enable DeltaLake.
sc.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
sc.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
# Set AWS S3 related options.
sc.set("spark.hadoop.fs.s3a.endpoint", "s3.us-east-2.amazonaws.com")
sc.set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc.set("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
# We have hive standalone metastore 3.0.0 running:
sc.set("hive.metastore.uris", "thrift://localhost:9083/datalake") \
        .set("spark.sql.warehouse.dir", "s3://data-hive/")

ss = SparkSession.builder.master('local[*]')\
    .enableHiveSupport() \
    .appName("delta_test").config(conf=sc).getOrCreate()
    
# Launch thrift server:
from py4j.java_gateway import java_import
java_import(ss._jvm, "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$")
hfs2_clazz=ss._jvm.java.lang.Class.forName("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$")
runner=hfs2_clazz.getDeclaredField("MODULE$").get(None).startWithContext(ss._jsparkSession.sqlContext())


from time import sleep


while runner.getServiceState().name()=="STARTED":
    sleep(5)

This runs thrift server at port 10000. Now, in a separate process, we fire the query (with SQLAlchemy's hive directive support):

CREATE TABLE IF NOT EXISTS datalake.tbl1 (
    a STRING,
    b BIGINT,
    c STRING
)
USING org.apache.spark.sql.delta.sources.DeltaDataSource
PARTITIONED BY (a)

Observed results

SQL scripts run fine as is. However, if I change to ... USING DELTA ... as suggested by the documentation, after the first success run, I see an error:

java.lang.ClassNotFoundException: 
Failed to find data source: DELTA. Please find packages at
http://spark.apache.org/third-party-projects.html
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:443)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:670)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:720)
	at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog.org$apache$spark$sql$catalyst$analysis$ResolveSessionCatalog$$isV2Provider(ResolveSessionCatalog.scala:641)

...	

Caused by: java.lang.ClassNotFoundException: DELTA.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:656)

Expected results

I expect USING DELTA to be working just fine as long as the server is running.

Further details

Environment information

  • Delta Lake version: 2.0.0
  • Spark version: 3.2.2
  • Scala version: 2.12
  • Python version: 3.10

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • [ ] Yes. I can contribute a fix for this bug independently.
  • [x] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • [ ] No. I cannot contribute a bug fix at this time.

chajath avatar Aug 17 '22 17:08 chajath

This looks like a Spark thrift server issue about --packages support similar to #1332. Maybe create an issue on https://issues.apache.org/jira/browse/SPARK and ask the Spark community to check the class loader used by Spark thrift server?

Could you try to put delta-storage-2.0.0.jar and delta-core_2.12-2.0.0.jar into the jars directory under spark and see if this fixes the issue?

zsxwing avatar Aug 17 '22 23:08 zsxwing

We use thrift server and create tables the same way, with jars backed inside spark image and provided to spark as @zsxwing suggested. No issues with delta 1.1.

george-zubrienko avatar Aug 24 '22 13:08 george-zubrienko