delta
delta copied to clipboard
DELTA source resolution is brittle when serving Spark SQL through Hive Thrift server
Bug
Describe the problem
We run sqls to create TABLEs etc with ... USING DELTA
directive. It works ok for the first time, but in the subsequent run, or if there was an error in the previous run, DELTA
name fails to get resolved. If I fully spell out ... USING org.apache.spark.sql.delta.sources.DeltaDataSource
, then the statement still works.
Steps to reproduce
I run Spark SQL thrift server programmatically from PySpark:
# with delta-spark==2.0.0 and pyspark==3.2.2
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkConf()
# Install JVM packages.
sc.set("spark.jars.packages", ",".join([
"org.apache.hadoop:hadoop-aws:3.2.2",
"io.delta:delta-core_2.12:2.0.0",
]))
# Enable DeltaLake.
sc.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
sc.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
# Set AWS S3 related options.
sc.set("spark.hadoop.fs.s3a.endpoint", "s3.us-east-2.amazonaws.com")
sc.set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc.set("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
# We have hive standalone metastore 3.0.0 running:
sc.set("hive.metastore.uris", "thrift://localhost:9083/datalake") \
.set("spark.sql.warehouse.dir", "s3://data-hive/")
ss = SparkSession.builder.master('local[*]')\
.enableHiveSupport() \
.appName("delta_test").config(conf=sc).getOrCreate()
# Launch thrift server:
from py4j.java_gateway import java_import
java_import(ss._jvm, "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$")
hfs2_clazz=ss._jvm.java.lang.Class.forName("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$")
runner=hfs2_clazz.getDeclaredField("MODULE$").get(None).startWithContext(ss._jsparkSession.sqlContext())
from time import sleep
while runner.getServiceState().name()=="STARTED":
sleep(5)
This runs thrift server at port 10000. Now, in a separate process, we fire the query (with SQLAlchemy's hive directive support):
CREATE TABLE IF NOT EXISTS datalake.tbl1 (
a STRING,
b BIGINT,
c STRING
)
USING org.apache.spark.sql.delta.sources.DeltaDataSource
PARTITIONED BY (a)
Observed results
SQL scripts run fine as is. However, if I change to ... USING DELTA ...
as suggested by the documentation, after the first success run, I see an error:
java.lang.ClassNotFoundException:
Failed to find data source: DELTA. Please find packages at
http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:443)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:670)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:720)
at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog.org$apache$spark$sql$catalyst$analysis$ResolveSessionCatalog$$isV2Provider(ResolveSessionCatalog.scala:641)
...
Caused by: java.lang.ClassNotFoundException: DELTA.DefaultSource
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:656)
Expected results
I expect USING DELTA
to be working just fine as long as the server is running.
Further details
Environment information
- Delta Lake version: 2.0.0
- Spark version: 3.2.2
- Scala version: 2.12
- Python version: 3.10
Willingness to contribute
The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?
- [ ] Yes. I can contribute a fix for this bug independently.
- [x] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
- [ ] No. I cannot contribute a bug fix at this time.
This looks like a Spark thrift server issue about --packages
support similar to #1332. Maybe create an issue on https://issues.apache.org/jira/browse/SPARK and ask the Spark community to check the class loader used by Spark thrift server?
Could you try to put delta-storage-2.0.0.jar
and delta-core_2.12-2.0.0.jar
into the jars
directory under spark and see if this fixes the issue?
We use thrift server and create tables the same way, with jars backed inside spark image and provided to spark as @zsxwing suggested. No issues with delta 1.1.