delta icon indicating copy to clipboard operation
delta copied to clipboard

[Question] py4j.protocol.Py4JJavaError: An error occurred while calling o38.applySchemaToPythonRDD

Open aimtsou opened this issue 2 years ago • 9 comments

Bug

Getting Incompatible Class with the most recent spark and delta-spark versions for all of my unit tests. I run my tests with: python -m pytest tests/unit

The error:

E                   py4j.protocol.Py4JJavaError: An error occurred while calling o38.applySchemaToPythonRDD.
E                   : java.lang.IncompatibleClassChangeError: class org.apache.spark.sql.catalyst.plans.logical.DeltaDelete has interface org.apache.spark.sql.catalyst.plans.logical.UnaryNode as super class

Describe the problem

I am having a docker image running python 3.8 and my python application which runs fine with delta-spark 1.0.0 and spark 3.1.2. I am trying to upgrade my docker image to python 3.9 with delta-spark 2.2.0 and spark 3.3.1. Unfortunately while all the tests of my application pass in the first image in the upgraded one I get the incompatible class error.

The changes between the 2 docker images are:

  • Python version
  • Delta spark package
  • Spark

Steps to reproduce

Unfortunately I cannot provide all the code for my unit tests but I have at least one version combination of spark and delta-spark working.

Observed results

answer = 'xro3487', gateway_client = <py4j.clientserver.JavaClient object at 0xffff64a47880>, target_id = 'o38', name = 'applySchemaToPythonRDD'

    def get_return_value(answer, gateway_client, target_id=None, name=None):
        """Converts an answer received from the Java gateway into a Python object.
    
        For example, string representation of integers are converted to Python
        integer, string representation of objects are converted to JavaObject
        instances, etc.
    
        :param answer: the string returned by the Java gateway
        :param gateway_client: the gateway client used to communicate with the Java
            Gateway. Only necessary if the answer is a reference (e.g., object,
            list, map)
        :param target_id: the name of the object from which the answer comes from
            (e.g., *object1* in `object1.hello()`). Optional.
        :param name: the name of the member from which the answer comes from
            (e.g., *hello* in `object1.hello()`). Optional.
        """
        if is_error(answer)[0]:
            if len(answer) > 1:
                type = answer[1]
                value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
                if answer[1] == REFERENCE_TYPE:
>                   raise Py4JJavaError(
                        "An error occurred while calling {0}{1}{2}.\n".
                        format(target_id, ".", name), value)
E                   py4j.protocol.Py4JJavaError: An error occurred while calling o38.applySchemaToPythonRDD.
E                   : java.lang.IncompatibleClassChangeError: class org.apache.spark.sql.catalyst.plans.logical.DeltaDelete has interface org.apache.spark.sql.catalyst.plans.logical.UnaryNode as super class
E                       at java.base/java.lang.ClassLoader.defineClass1(Native Method)
E                       at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017)
E                       at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
E                       at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:555)
E                       at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
E                       at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
E                       at java.base/java.security.AccessController.doPrivileged(Native Method)
E                       at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
E                       at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
E                       at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
E                       at org.apache.spark.sql.delta.DeltaAnalysis.apply(DeltaAnalysis.scala:64)
E                       at org.apache.spark.sql.delta.DeltaAnalysis.apply(DeltaAnalysis.scala:57)
E                       at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
E                       at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
E                       at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
E                       at scala.collection.immutable.List.foldLeft(List.scala:91)
E                       at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
E                       at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
E                       at scala.collection.immutable.List.foreach(List.scala:431)
E                       at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
E                       at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:227)
E                       at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:223)
E                       at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:172)
E                       at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:223)
E                       at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:187)
E                       at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
E                       at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
E                       at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
E                       at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:208)
E                       at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
E                       at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
E                       at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
E                       at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
E                       at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
E                       at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
E                       at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
E                       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
E                       at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
E                       at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
E                       at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
E                       at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
E                       at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
E                       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
E                       at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
E                       at org.apache.spark.sql.SparkSession.internalCreateDataFrame(SparkSession.scala:570)
E                       at org.apache.spark.sql.SparkSession.applySchemaToPythonRDD(SparkSession.scala:756)
E                       at org.apache.spark.sql.SparkSession.applySchemaToPythonRDD(SparkSession.scala:741)
E                       at jdk.internal.reflect.GeneratedMethodAccessor25.invoke(Unknown Source)
E                       at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E                       at java.base/java.lang.reflect.Method.invoke(Method.java:566)
E                       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E                       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E                       at py4j.Gateway.invoke(Gateway.java:282)
E                       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E                       at py4j.commands.CallCommand.execute(CallCommand.java:79)
E                       at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
E                       at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
E                       at java.base/java.lang.Thread.run(Thread.java:829)

/spark-3.3.1-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py:326: Py4JJavaError

Further Details

tests/unit/.../../abc.py:118: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.9/site-packages/../../test_case_runner.py:24: in run_test
    input_df, expected_df = self.test_case_builder.build_testcase_params(inputs, expected)
/usr/local/lib/python3.9/site-packages/../../test_case_parameter_builder.py:25: in build_testcase_params
    input_dfs = [self._convert_to_data_frame(input) for input in inputs]
/usr/local/lib/python3.9/site-packages/../../test_case_parameter_builder.py:25: in <listcomp>
    input_dfs = [self._convert_to_data_frame(input) for input in inputs]
/usr/local/lib/python3.9/site-packages/../../test_case_parameter_builder.py:38: in _convert_to_data_frame
    return self.spark_test_handler.rows_schema_to_df(test_data.data, test_data.schema)
/usr/local/lib/python3.9/site-packages/../spark_test_handler.py:55: in rows_schema_to_df
    data_frame = self.create_df(rows, schema)
/usr/local/lib/python3.9/site-packages/../spark_test_handler.py:51: in create_df
    return self.spark.createDataFrame(self.spark.sparkContext.parallelize(rows, 1), schema)
/spark-3.3.1-bin-hadoop3/python/pyspark/sql/session.py:894: in createDataFrame
    return self._create_dataframe(
/spark-3.3.1-bin-hadoop3/python/pyspark/sql/session.py:939: in _create_dataframe
    jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), struct.json())
/spark-3.3.1-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321: in __call__
    return_value = get_return_value(
/spark-3.3.1-bin-hadoop3/python/pyspark/sql/utils.py:190: in deco
    return f(*a, **kw)

The package omitted in site-packages in an internal package.

Expected results

The unit tests should be passing as the code has not changed. Running the same code with 2 different docker images results in failed results in the newest image.

Environment information

  • Python 3.9
  • Delta Spark 2.2.0
  • Spark version: 3.3.1
  • Pyspark 3.3.1

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • [ ] Yes. I can contribute a fix for this bug independently.
  • [ ] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • [X] No. I cannot contribute a bug fix at this time.

aimtsou avatar Feb 24 '23 08:02 aimtsou

Good morning,

Apparently, the problem is that the delta-core library does not exist. I need to have it inside the the sparks/jars folder and have the JAVA_HOME set. I also want to try to set put the jar inside the pyspark jar folder to see if it will work that way.

Although I see that in the documentation there is no reference to that in the Python section: https://docs.delta.io/latest/quick-start.html

Maybe there is room for improvement?

aimtsou avatar Mar 01 '23 10:03 aimtsou

We don't recommend using the python command directly as you would need to set up a lot of things manually. This is the section for PySpark: https://docs.delta.io/latest/quick-start.html#pyspark-shell

zsxwing avatar Apr 13 '23 21:04 zsxwing

@zsxwing: I can understand that but there is a problem.

I have a python project which uses spark and delta tables. I want to run it in a docker image so I pick up the docker image from apache foundation which contains spark. When I try to run the tests for my project it fails because the delta-core library does not exist in the spark jars folder. Consequently as you see, I am not running python directly but my tests ran through python and for these tests I need pyspark and delta-core.

Moreover, the error handling should be better, nothing from the stack traces implies of a missing library which makes debugging the problem very hard.

In my opinion, I see 2 ways here:

  • The documentation is improved when you do not run pyspark directly with the needed libraries.
  • The error handling on the missing libraries is improved in code, so the errors become more obvious.

So I see room for improvement, besides you can suggest me another way of making the tests work.

aimtsou avatar Apr 14 '23 05:04 aimtsou

Thanks for the feedback. Yep, there is definitely room for improvement. I just read your description again and there is something I don't understand right.

java.lang.IncompatibleClassChangeError: class org.apache.spark.sql.catalyst.plans.logical.DeltaDelete has interface org.apache.spark.sql.catalyst.plans.logical.UnaryNode as super class

This error is because delta-lake version (<=1.0.x) and spark version (>=3.2.x) are not compatible. Is it possible that you have an old version delta-spark version somewhere, and putting a new version of delta-core to spark's jars folder happened to fix it?

zsxwing avatar Apr 14 '23 22:04 zsxwing

Good morning @zsxwing,

I checked my docker image and there is nowhere another version of delta. The only version is the one I download. If I remove the jar file from the spark jars folder then I get these errors. I can retry to be sure or I can share with you how I build the image.

aimtsou avatar Apr 17 '23 06:04 aimtsou

If I remove the jar file from the spark jars folder then I get these errors.

What's your command to start Spark in this case? I'm looking for the --packages parameter.

zsxwing avatar Apr 20 '23 18:04 zsxwing

Hi @zsxwing,

I do not run any pyspark command I wait for it to be invoked from python when the unit tests are called. What i only do is to run the tests which need pyspark from my code repository with:

python -m pytest tests/unit

aimtsou avatar May 02 '23 07:05 aimtsou

Hi, I'm experincing some diff issue , in my case spark application container is restarting .... when i process an event it is running fine for first event and if we process 2nd event it is restarting and throwing below exception , pls help me if any body have an idea on this.

rdd = spark_inst.readCSV(sc_tuple[1], process_cur, schema_sent=schema).rdd.repartition(num_partition) 2023-10-16T07:08:52.091016207Z File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 214, in rdd 2023-10-16T07:08:52.091020214Z File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in call 2023-10-16T07:08:52.091022227Z File "/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 179, in deco

Mon, Oct 16 2023 12:38:52 pm | File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value Mon, Oct 16 2023 12:38:52 pm | py4j.protocol.Py4JJavaError: An error occurred while calling o424.javaToPython. 2023-10-16T07:08:52.099571794Z : java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext. Mon, Oct 16 2023 12:38:52 pm | This stopped SparkContext was created at:

Mon, Oct 16 2023 12:38:52 pm |   Mon, Oct 16 2023 12:38:52 pm | The currently active SparkContext was created at: Mon, Oct 16 2023 12:38:52 pm |   Mon, Oct 16 2023 12:38:52 pm | org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58) 2023-10-16T07:08:52.099615975Z java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 2023-10-16T07:08:52.099617906Z java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) Mon, Oct 16 2023 12:38:52 pm | java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) Mon, Oct 16 2023 12:38:52 pm | java.base/java.lang.reflect.Constructor.newInstance(Unknown Source) 2023-10-16T07:08:52.099623338Z py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247) 2023-10-16T07:08:52.099625123Z py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) 2023-10-16T07:08:52.099626901Z py4j.Gateway.invoke(Gateway.java:238) Mon, Oct 16 2023 12:38:52 pm | py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) Mon, Oct 16 2023 12:38:52 pm | py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) Mon, Oct 16 2023 12:38:52 pm | py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) 2023-10-16T07:08:52.099634062Z py4j.ClientServerConnection.run(ClientServerConnection.java:106) Mon, Oct 16 2023 12:38:52 pm | java.base/java.lang.Thread.run(Unknown Source)

manikumarn559 avatar Oct 16 '23 07:10 manikumarn559

Hi. Could you figure out a solution to this issue?

imenas avatar May 01 '24 01:05 imenas