delta
delta copied to clipboard
Initialized spark session with “configure_spark_with_delta_pip” stuck
Bug
I am working on testing new features for deltalake 2.0 and facing below issue while trying to initialized spark session with “configure_spark_with_delta_pip”. Any advice on what might be missing would be greatly help and appreciated. Spark : 3.2 slack: https://delta-users.slack.com/archives/CJ70UCSHM/p1658850509055239.
Steps to reproduce
import pyspark from delta import *
builder = ( pyspark.sql.SparkSession.builder.appName("MyApp") .config('spark.master','local[*]') .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("spark.jars","gs://xxxxx/g1/jars/delta-core_2.12-2.0.0.jar") )
spark = configure_spark_with_delta_pip(builder).getOrCreate()
Observed results
Expected results
Further details
Environment information
- Delta Lake version: 2.0
- Spark version: 3.2
- Scala version: 2.12
- Python Version : 3.8
Willingness to contribute
The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?
- [ ] Yes. I can contribute a fix for this bug independently.
- [ ] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
- [ ] No. I cannot contribute a bug fix at this time.
Thanks @BShraman - will try to repro this locally. Quick question - for .config("spark.jars","gs://xxxxx/g1/jars/delta-core_2.12-2.0.0.jar")
by any chance have you tested this locally and still get the same error? Per the Slack thread, wanted to re-iterate that this is happening to delta-core_2.12-2.0.0.jar
but not delta-core_2.12-1.2.0.jar
. Thanks!
Thanks @BShraman - will try to repro this locally. Quick question - for
.config("spark.jars","gs://xxxxx/g1/jars/delta-core_2.12-2.0.0.jar")
by any chance have you tested this locally and still get the same error? Per the Slack thread, wanted to re-iterate that this is happening todelta-core_2.12-2.0.0.jar
but notdelta-core_2.12-1.2.0.jar
. Thanks!
Hi @dennyglee , if i run this locally with ".config("spark.jars.packages","io.delta:delta-core_2.12:2.0.0")" it is pulling dependency from maven repository and everything runs without any issue. Fetching dependency directly from maven is blocked in our organization. Alternate option : i am trying with passing delta jar explicitly after downloading locally and this is where i am facing an issue.
Based on some quick testing, I believe you may have delta-spark==1.2.0
or lower installed in your environment. Could you try upgrading it to delta-spark==2.0.0
and see if that helps? Note, I'm still investigating another issue related to this.
Details
Running the following commands directly from the local python
shell.
Local environment
Local installs using sdkman
Spark, Java, and Scala version management and virtual environment for Python version management.
- Spark 3.2.0
- Python 3.9.10
- Scala 2.12.8
With delta-spark==1.2.0 installed
i.e. with pip install delta-spark==1.2.0
Running with Delta 1.2
import pyspark
from delta import *
my_packages = ["io.delta:delta-core_2.12:2.0.0"]
builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config('spark.master','local[*]')
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
)
spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()
Result: Works as expected
Running with Delta 2.0
import pyspark
from delta import *
my_packages = ["io.delta:delta-core_2.12:2.0.0"]
builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config('spark.master','local[*]')
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
)
spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()
Result: Fails with the following error (which is similar to your error)
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-34819e71-fc19-4aac-8ebd-fab64b77ba27;1.0
confs: [default]
found io.delta#delta-core_2.12;2.0.0 in central
found io.delta#delta-storage;2.0.0 in central
found org.antlr#antlr4-runtime;4.8 in central
found org.codehaus.jackson#jackson-core-asl;1.9.13 in spark-list
:: resolution report :: resolve 118ms :: artifacts dl 4ms
:: modules in use:
io.delta#delta-core_2.12;2.0.0 from central in [default]
io.delta#delta-storage;2.0.0 from central in [default]
org.antlr#antlr4-runtime;4.8 from central in [default]
org.codehaus.jackson#jackson-core-asl;1.9.13 from spark-list in [default]
:: evicted modules:
io.delta#delta-core_2.12;1.2.0 by [io.delta#delta-core_2.12;2.0.0] in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 5 | 0 | 0 | 1 || 3 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-34819e71-fc19-4aac-8ebd-fab64b77ba27
confs: [default]
0 artifacts copied, 3 already retrieved (0kB/3ms)
22/08/07 18:06:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/08/07 18:06:40 WARN SparkSession: Cannot use io.delta.sql.DeltaSparkSessionExtension to configure session extensions.
With delta-spark==2.0.0 installed
i.e. with pip install delta-spark==2.0.0
import pyspark
from delta import *
my_packages = ["io.delta:delta-core_2.12:2.0.0"]
builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config('spark.master','local[*]')
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
)
spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()
Result:: Works as expected
@BShraman - I'm happy to help you figure this one out.
Alternate option : i am trying with passing delta jar explicitly after downloading locally and this is where i am facing an issue.
Yea, I don't think you need to pass the Delta JAR in if you've already downloaded locally.
Here's your code snippet with some comments:
import pyspark
# this next line shows that you already have Delta installed. If you didn't have Delta installed, this line would error out.
from delta import *
builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config('spark.master','local[*]')
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.jars","gs://xxxxx/g1/jars/delta-core_2.12-2.0.0.jar") # I don't think you need this because you already have Delta installed
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()
Does this work?
import pyspark
from delta import *
builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config('spark.master','local[*]')
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()
Let me know if this sorts out the issue for you. I am happy to keep helping till you figure this out.
This is the draft PR I created to experiment with this BTW. I was getting the same warning errors Denny got when I mixed & matched my locally installed Delta version with a JAR file that had a different version (e.g. when I had Delta 1.2 installed locally and attached the Delta 2.0 JAR file).
Could we also ship delta-spark
in a self contained fashion similar to pyspark? Having extra dependency on maven is not as seamless in many pyspark dependent orgs.
@santosh-d3vpl3x - It's a great idea and that's what's been done. delta-spark is in PyPi and doesn't have a Maven dependency. You can just run pip install delta-spark
.
@MrPowers Thats good to know. https://docs.delta.io/latest/quick-start.html#python along with https://github.com/delta-io/delta/blob/v2.1.0rc1/python/delta/pip_utils.py#L26-L28 gave me an impression that delta-spark
is just a sugar coating over existing flow.
delta-spark is in PyPi and doesn't have a Maven dependency.
On a second thought, do you have some resources handy for this?
@santosh-d3vpl3x - Yea, we should make those installation instructions more clear. Here's a blog post I wrote on installing PySpark & Delta Lake with conda. I will try to make more content that'll make it easier to get up-and-running with Delta Lake. Thanks for the comments / feedback.
@MrPowers I Spent some time going through the code for delta-spark
to understand what it is doing at the moment.
- It is indeed a thin wrapper and tool for convenience but not a self complete package like
pyspark
.configure_spark_with_delta_pip
adds maven coordinate to the spark session builder. If you have clean ivy dir/maven dir and disconnected from the internet then the instruction here don't work. 2 . Also to check if we need theconfigure_spark_with_delta_pip
call, excludingconfigure_spark_with_delta_pip
indicates:
import pyspark
from delta import *
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
Results in java.lang.ClassNotFoundException: io.delta.sql.DeltaSparkSessionExtension
@MrPowers
Here are my pip list
Spark Configuration :
##################### import pyspark from delta import *
builder = ( pyspark.sql.SparkSession.builder.appName("MyApp") .config('spark.master','local[*]') .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("spark.jars","gs://xxxxx/g1/jars/delta-core_2.12-2.0.0.jar") )
spark = configure_spark_with_delta_pip(builder).getOrCreate()
############ import pyspark from delta import *
my_packages = ["io.delta:delta-core_2.12:2.0.0"]
builder = ( pyspark.sql.SparkSession.builder.appName("MyApp") .config('spark.master','local[*]') .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") )
spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()
Both are giving me same error as below :
WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.2.2.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release :: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml Ivy Default Cache set to: /home/xxxxx/.ivy2/cache The jars for the packages stored in: /home/xxxxx/.ivy2/jars io.delta#delta-core_2.12 added as a dependency io.delta#delta-core_2.12 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-9f5c2d62-314f-4857-a764-d72116906ec7;1.0 confs: [default]
Some users can't access JAR files via Maven. It would be cool if we could upload the JAR files to PyPi and give an option to fetch the relevant JAR from PiPy for users that don't have access to Maven.
I use the following conf, solved the problem, and works perfectly #Create a spark Context class, with custom config conf = SparkConf() #conf.set('spark.sql.debug.maxToStringFields', 100) conf.set('spark.default.parallelism', 700) conf.set('spark.sql.shuffle.partitions', 700) conf.set('spark.driver.memory', '30g') conf.set('spark.driver.cores', 8) conf.set('spark.executor.cores', 8) conf.set('spark.executor.memory', '30g') conf.set("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0") conf.set('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') conf.set('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')
sc = SparkContext.getOrCreate(conf)
#import library
from delta import *
#we need to config sparksession
my_packages = ["io.delta:delta-core_2.12:2.0.0"]
builder = (pyspark.sql.SparkSession.builder.appName("Myapp")
.config('spark.master','local[*]'))
spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()
my spark env: delta-spark 2.0.0 findspark 2.0.1 pyspark 3.2.3 sparksql-magic 0.0.3