delta icon indicating copy to clipboard operation
delta copied to clipboard

Initialized spark session with “configure_spark_with_delta_pip” stuck

Open BShraman opened this issue 2 years ago • 2 comments

Bug

I am working on testing new features for deltalake 2.0 and facing below issue while trying to initialized spark session with “configure_spark_with_delta_pip”. Any advice on what might be missing would be greatly help and appreciated. Spark : 3.2 slack: https://delta-users.slack.com/archives/CJ70UCSHM/p1658850509055239.

Steps to reproduce

import pyspark from delta import *

builder = ( pyspark.sql.SparkSession.builder.appName("MyApp") .config('spark.master','local[*]') .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("spark.jars","gs://xxxxx/g1/jars/delta-core_2.12-2.0.0.jar") )

spark = configure_spark_with_delta_pip(builder).getOrCreate()

Observed results

Expected results

Further details

Environment information

  • Delta Lake version: 2.0
  • Spark version: 3.2
  • Scala version: 2.12
  • Python Version : 3.8

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • [ ] Yes. I can contribute a fix for this bug independently.
  • [ ] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • [ ] No. I cannot contribute a bug fix at this time.

BShraman avatar Jul 28 '22 18:07 BShraman

Thanks @BShraman - will try to repro this locally. Quick question - for .config("spark.jars","gs://xxxxx/g1/jars/delta-core_2.12-2.0.0.jar") by any chance have you tested this locally and still get the same error? Per the Slack thread, wanted to re-iterate that this is happening to delta-core_2.12-2.0.0.jar but not delta-core_2.12-1.2.0.jar. Thanks!

dennyglee avatar Jul 28 '22 18:07 dennyglee

Thanks @BShraman - will try to repro this locally. Quick question - for .config("spark.jars","gs://xxxxx/g1/jars/delta-core_2.12-2.0.0.jar") by any chance have you tested this locally and still get the same error? Per the Slack thread, wanted to re-iterate that this is happening to delta-core_2.12-2.0.0.jar but not delta-core_2.12-1.2.0.jar. Thanks!

Hi @dennyglee , if i run this locally with ".config("spark.jars.packages","io.delta:delta-core_2.12:2.0.0")" it is pulling dependency from maven repository and everything runs without any issue. Fetching dependency directly from maven is blocked in our organization. Alternate option : i am trying with passing delta jar explicitly after downloading locally and this is where i am facing an issue.

BShraman avatar Jul 28 '22 18:07 BShraman

Based on some quick testing, I believe you may have delta-spark==1.2.0 or lower installed in your environment. Could you try upgrading it to delta-spark==2.0.0 and see if that helps? Note, I'm still investigating another issue related to this.

Details

Running the following commands directly from the local python shell.

Local environment

Local installs using sdkman Spark, Java, and Scala version management and virtual environment for Python version management.

  • Spark 3.2.0
  • Python 3.9.10
  • Scala 2.12.8

With delta-spark==1.2.0 installed

i.e. with pip install delta-spark==1.2.0

Running with Delta 1.2

import pyspark
from delta import *

my_packages = ["io.delta:delta-core_2.12:2.0.0"]

builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config('spark.master','local[*]')
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

Result: Works as expected

Running with Delta 2.0

import pyspark
from delta import *

my_packages = ["io.delta:delta-core_2.12:2.0.0"]

builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config('spark.master','local[*]')
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

Result: Fails with the following error (which is similar to your error)

io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-34819e71-fc19-4aac-8ebd-fab64b77ba27;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.0.0 in central
	found io.delta#delta-storage;2.0.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in spark-list
:: resolution report :: resolve 118ms :: artifacts dl 4ms
	:: modules in use:
	io.delta#delta-core_2.12;2.0.0 from central in [default]
	io.delta#delta-storage;2.0.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	org.codehaus.jackson#jackson-core-asl;1.9.13 from spark-list in [default]
	:: evicted modules:
	io.delta#delta-core_2.12;1.2.0 by [io.delta#delta-core_2.12;2.0.0] in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   5   |   0   |   0   |   1   ||   3   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-34819e71-fc19-4aac-8ebd-fab64b77ba27
	confs: [default]
	0 artifacts copied, 3 already retrieved (0kB/3ms)
22/08/07 18:06:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/08/07 18:06:40 WARN SparkSession: Cannot use io.delta.sql.DeltaSparkSessionExtension to configure session extensions.

With delta-spark==2.0.0 installed

i.e. with pip install delta-spark==2.0.0

import pyspark
from delta import *

my_packages = ["io.delta:delta-core_2.12:2.0.0"]

builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config('spark.master','local[*]')
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

Result:: Works as expected

dennyglee avatar Aug 08 '22 01:08 dennyglee

@BShraman - I'm happy to help you figure this one out.

Alternate option : i am trying with passing delta jar explicitly after downloading locally and this is where i am facing an issue.

Yea, I don't think you need to pass the Delta JAR in if you've already downloaded locally.

Here's your code snippet with some comments:

import pyspark
# this next line shows that you already have Delta installed.  If you didn't have Delta installed, this line would error out.
from delta import *

builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config('spark.master','local[*]')
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.jars","gs://xxxxx/g1/jars/delta-core_2.12-2.0.0.jar") # I don't think you need this because you already have Delta installed
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

Does this work?

import pyspark
from delta import *

builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config('spark.master','local[*]')
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

Let me know if this sorts out the issue for you. I am happy to keep helping till you figure this out.

MrPowers avatar Aug 10 '22 01:08 MrPowers

This is the draft PR I created to experiment with this BTW. I was getting the same warning errors Denny got when I mixed & matched my locally installed Delta version with a JAR file that had a different version (e.g. when I had Delta 1.2 installed locally and attached the Delta 2.0 JAR file).

MrPowers avatar Aug 10 '22 01:08 MrPowers

Could we also ship delta-spark in a self contained fashion similar to pyspark? Having extra dependency on maven is not as seamless in many pyspark dependent orgs.

santosh-d3vpl3x avatar Aug 23 '22 08:08 santosh-d3vpl3x

@santosh-d3vpl3x - It's a great idea and that's what's been done. delta-spark is in PyPi and doesn't have a Maven dependency. You can just run pip install delta-spark.

MrPowers avatar Aug 23 '22 12:08 MrPowers

@MrPowers Thats good to know. https://docs.delta.io/latest/quick-start.html#python along with https://github.com/delta-io/delta/blob/v2.1.0rc1/python/delta/pip_utils.py#L26-L28 gave me an impression that delta-spark is just a sugar coating over existing flow.

delta-spark is in PyPi and doesn't have a Maven dependency.

On a second thought, do you have some resources handy for this?

santosh-d3vpl3x avatar Aug 23 '22 12:08 santosh-d3vpl3x

@santosh-d3vpl3x - Yea, we should make those installation instructions more clear. Here's a blog post I wrote on installing PySpark & Delta Lake with conda. I will try to make more content that'll make it easier to get up-and-running with Delta Lake. Thanks for the comments / feedback.

MrPowers avatar Aug 23 '22 12:08 MrPowers

@MrPowers I Spent some time going through the code for delta-spark to understand what it is doing at the moment.

  1. It is indeed a thin wrapper and tool for convenience but not a self complete package like pyspark. configure_spark_with_delta_pip adds maven coordinate to the spark session builder. If you have clean ivy dir/maven dir and disconnected from the internet then the instruction here don't work. 2 . Also to check if we need the configure_spark_with_delta_pip call, excluding configure_spark_with_delta_pip indicates:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()

Results in java.lang.ClassNotFoundException: io.delta.sql.DeltaSparkSessionExtension

santosh-d3vpl3x avatar Aug 26 '22 09:08 santosh-d3vpl3x

@MrPowers

Here are my pip list image

Spark Configuration :

##################### import pyspark from delta import *

builder = ( pyspark.sql.SparkSession.builder.appName("MyApp") .config('spark.master','local[*]') .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("spark.jars","gs://xxxxx/g1/jars/delta-core_2.12-2.0.0.jar") )

spark = configure_spark_with_delta_pip(builder).getOrCreate()

############ import pyspark from delta import *

my_packages = ["io.delta:delta-core_2.12:2.0.0"]

builder = ( pyspark.sql.SparkSession.builder.appName("MyApp") .config('spark.master','local[*]') .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") )

spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

Both are giving me same error as below :

WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.2.2.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release :: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml Ivy Default Cache set to: /home/xxxxx/.ivy2/cache The jars for the packages stored in: /home/xxxxx/.ivy2/jars io.delta#delta-core_2.12 added as a dependency io.delta#delta-core_2.12 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-9f5c2d62-314f-4857-a764-d72116906ec7;1.0 confs: [default]

BShraman avatar Aug 30 '22 17:08 BShraman

Some users can't access JAR files via Maven. It would be cool if we could upload the JAR files to PyPi and give an option to fetch the relevant JAR from PiPy for users that don't have access to Maven.

MrPowers avatar Sep 08 '22 19:09 MrPowers

I use the following conf, solved the problem, and works perfectly #Create a spark Context class, with custom config conf = SparkConf() #conf.set('spark.sql.debug.maxToStringFields', 100) conf.set('spark.default.parallelism', 700) conf.set('spark.sql.shuffle.partitions', 700) conf.set('spark.driver.memory', '30g') conf.set('spark.driver.cores', 8) conf.set('spark.executor.cores', 8) conf.set('spark.executor.memory', '30g') conf.set("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0") conf.set('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') conf.set('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')

sc = SparkContext.getOrCreate(conf)

#import library from delta import * #we need to config sparksession my_packages = ["io.delta:delta-core_2.12:2.0.0"] builder = (pyspark.sql.SparkSession.builder.appName("Myapp")
.config('spark.master','local[*]'))

spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

my spark env: delta-spark 2.0.0 findspark 2.0.1 pyspark 3.2.3 sparksql-magic 0.0.3

FelixQLe avatar Feb 24 '23 05:02 FelixQLe