delta
delta copied to clipboard
[Question] py4j.protocol.Py4JJavaError: An error occurred while calling o38.applySchemaToPythonRDD
Bug
Getting Incompatible Class with the most recent spark and delta-spark versions for all of my unit tests.
I run my tests with:
python -m pytest tests/unit
The error:
E py4j.protocol.Py4JJavaError: An error occurred while calling o38.applySchemaToPythonRDD.
E : java.lang.IncompatibleClassChangeError: class org.apache.spark.sql.catalyst.plans.logical.DeltaDelete has interface org.apache.spark.sql.catalyst.plans.logical.UnaryNode as super class
Describe the problem
I am having a docker image running python 3.8 and my python application which runs fine with delta-spark 1.0.0 and spark 3.1.2. I am trying to upgrade my docker image to python 3.9 with delta-spark 2.2.0 and spark 3.3.1. Unfortunately while all the tests of my application pass in the first image in the upgraded one I get the incompatible class error.
The changes between the 2 docker images are:
- Python version
- Delta spark package
- Spark
Steps to reproduce
Unfortunately I cannot provide all the code for my unit tests but I have at least one version combination of spark and delta-spark working.
Observed results
answer = 'xro3487', gateway_client = <py4j.clientserver.JavaClient object at 0xffff64a47880>, target_id = 'o38', name = 'applySchemaToPythonRDD'
def get_return_value(answer, gateway_client, target_id=None, name=None):
"""Converts an answer received from the Java gateway into a Python object.
For example, string representation of integers are converted to Python
integer, string representation of objects are converted to JavaObject
instances, etc.
:param answer: the string returned by the Java gateway
:param gateway_client: the gateway client used to communicate with the Java
Gateway. Only necessary if the answer is a reference (e.g., object,
list, map)
:param target_id: the name of the object from which the answer comes from
(e.g., *object1* in `object1.hello()`). Optional.
:param name: the name of the member from which the answer comes from
(e.g., *hello* in `object1.hello()`). Optional.
"""
if is_error(answer)[0]:
if len(answer) > 1:
type = answer[1]
value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
if answer[1] == REFERENCE_TYPE:
> raise Py4JJavaError(
"An error occurred while calling {0}{1}{2}.\n".
format(target_id, ".", name), value)
E py4j.protocol.Py4JJavaError: An error occurred while calling o38.applySchemaToPythonRDD.
E : java.lang.IncompatibleClassChangeError: class org.apache.spark.sql.catalyst.plans.logical.DeltaDelete has interface org.apache.spark.sql.catalyst.plans.logical.UnaryNode as super class
E at java.base/java.lang.ClassLoader.defineClass1(Native Method)
E at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017)
E at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
E at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:555)
E at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
E at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
E at java.base/java.security.AccessController.doPrivileged(Native Method)
E at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
E at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
E at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
E at org.apache.spark.sql.delta.DeltaAnalysis.apply(DeltaAnalysis.scala:64)
E at org.apache.spark.sql.delta.DeltaAnalysis.apply(DeltaAnalysis.scala:57)
E at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
E at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
E at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
E at scala.collection.immutable.List.foldLeft(List.scala:91)
E at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
E at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
E at scala.collection.immutable.List.foreach(List.scala:431)
E at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
E at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:227)
E at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:223)
E at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:172)
E at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:223)
E at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:187)
E at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
E at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
E at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
E at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:208)
E at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
E at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
E at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
E at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
E at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
E at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
E at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
E at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
E at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
E at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
E at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
E at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
E at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
E at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
E at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
E at org.apache.spark.sql.SparkSession.internalCreateDataFrame(SparkSession.scala:570)
E at org.apache.spark.sql.SparkSession.applySchemaToPythonRDD(SparkSession.scala:756)
E at org.apache.spark.sql.SparkSession.applySchemaToPythonRDD(SparkSession.scala:741)
E at jdk.internal.reflect.GeneratedMethodAccessor25.invoke(Unknown Source)
E at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E at java.base/java.lang.reflect.Method.invoke(Method.java:566)
E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E at py4j.Gateway.invoke(Gateway.java:282)
E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E at py4j.commands.CallCommand.execute(CallCommand.java:79)
E at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
E at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
E at java.base/java.lang.Thread.run(Thread.java:829)
/spark-3.3.1-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py:326: Py4JJavaError
Further Details
tests/unit/.../../abc.py:118:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.9/site-packages/../../test_case_runner.py:24: in run_test
input_df, expected_df = self.test_case_builder.build_testcase_params(inputs, expected)
/usr/local/lib/python3.9/site-packages/../../test_case_parameter_builder.py:25: in build_testcase_params
input_dfs = [self._convert_to_data_frame(input) for input in inputs]
/usr/local/lib/python3.9/site-packages/../../test_case_parameter_builder.py:25: in <listcomp>
input_dfs = [self._convert_to_data_frame(input) for input in inputs]
/usr/local/lib/python3.9/site-packages/../../test_case_parameter_builder.py:38: in _convert_to_data_frame
return self.spark_test_handler.rows_schema_to_df(test_data.data, test_data.schema)
/usr/local/lib/python3.9/site-packages/../spark_test_handler.py:55: in rows_schema_to_df
data_frame = self.create_df(rows, schema)
/usr/local/lib/python3.9/site-packages/../spark_test_handler.py:51: in create_df
return self.spark.createDataFrame(self.spark.sparkContext.parallelize(rows, 1), schema)
/spark-3.3.1-bin-hadoop3/python/pyspark/sql/session.py:894: in createDataFrame
return self._create_dataframe(
/spark-3.3.1-bin-hadoop3/python/pyspark/sql/session.py:939: in _create_dataframe
jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), struct.json())
/spark-3.3.1-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321: in __call__
return_value = get_return_value(
/spark-3.3.1-bin-hadoop3/python/pyspark/sql/utils.py:190: in deco
return f(*a, **kw)
The package omitted in site-packages in an internal package.
Expected results
The unit tests should be passing as the code has not changed. Running the same code with 2 different docker images results in failed results in the newest image.
Environment information
- Python 3.9
- Delta Spark 2.2.0
- Spark version: 3.3.1
- Pyspark 3.3.1
Willingness to contribute
The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?
- [ ] Yes. I can contribute a fix for this bug independently.
- [ ] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
- [X] No. I cannot contribute a bug fix at this time.
Good morning,
Apparently, the problem is that the delta-core library does not exist. I need to have it inside the the sparks/jars folder and have the JAVA_HOME set. I also want to try to set put the jar inside the pyspark jar folder to see if it will work that way.
Although I see that in the documentation there is no reference to that in the Python section: https://docs.delta.io/latest/quick-start.html
Maybe there is room for improvement?
We don't recommend using the python
command directly as you would need to set up a lot of things manually. This is the section for PySpark: https://docs.delta.io/latest/quick-start.html#pyspark-shell
@zsxwing: I can understand that but there is a problem.
I have a python project which uses spark and delta tables. I want to run it in a docker image so I pick up the docker image from apache foundation which contains spark. When I try to run the tests for my project it fails because the delta-core library does not exist in the spark jars folder. Consequently as you see, I am not running python directly but my tests ran through python and for these tests I need pyspark and delta-core.
Moreover, the error handling should be better, nothing from the stack traces implies of a missing library which makes debugging the problem very hard.
In my opinion, I see 2 ways here:
- The documentation is improved when you do not run pyspark directly with the needed libraries.
- The error handling on the missing libraries is improved in code, so the errors become more obvious.
So I see room for improvement, besides you can suggest me another way of making the tests work.
Thanks for the feedback. Yep, there is definitely room for improvement. I just read your description again and there is something I don't understand right.
java.lang.IncompatibleClassChangeError: class org.apache.spark.sql.catalyst.plans.logical.DeltaDelete has interface org.apache.spark.sql.catalyst.plans.logical.UnaryNode as super class
This error is because delta-lake version (<=1.0.x) and spark version (>=3.2.x) are not compatible. Is it possible that you have an old version delta-spark version somewhere, and putting a new version of delta-core to spark's jars folder happened to fix it?
Good morning @zsxwing,
I checked my docker image and there is nowhere another version of delta. The only version is the one I download. If I remove the jar file from the spark jars folder then I get these errors. I can retry to be sure or I can share with you how I build the image.
If I remove the jar file from the spark jars folder then I get these errors.
What's your command to start Spark in this case? I'm looking for the --packages
parameter.
Hi @zsxwing,
I do not run any pyspark command I wait for it to be invoked from python when the unit tests are called. What i only do is to run the tests which need pyspark from my code repository with:
python -m pytest tests/unit
Hi, I'm experincing some diff issue , in my case spark application container is restarting .... when i process an event it is running fine for first event and if we process 2nd event it is restarting and throwing below exception , pls help me if any body have an idea on this.
rdd = spark_inst.readCSV(sc_tuple[1], process_cur, schema_sent=schema).rdd.repartition(num_partition) 2023-10-16T07:08:52.091016207Z File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 214, in rdd 2023-10-16T07:08:52.091020214Z File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in call 2023-10-16T07:08:52.091022227Z File "/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 179, in deco
Mon, Oct 16 2023 12:38:52 pm | File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value Mon, Oct 16 2023 12:38:52 pm | py4j.protocol.Py4JJavaError: An error occurred while calling o424.javaToPython. 2023-10-16T07:08:52.099571794Z : java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext. Mon, Oct 16 2023 12:38:52 pm | This stopped SparkContext was created at:
Mon, Oct 16 2023 12:38:52 pm |
Mon, Oct 16 2023 12:38:52 pm | The currently active SparkContext was created at:
Mon, Oct 16 2023 12:38:52 pm |
Mon, Oct 16 2023 12:38:52 pm | org.apache.spark.api.java.JavaSparkContext.
Hi. Could you figure out a solution to this issue?