deequ icon indicating copy to clipboard operation
deequ copied to clipboard

Error with modules of pydeequ

Open norhther opened this issue 2 years ago • 2 comments

I'm trying to create an anaconda environment to run pydeequ. Basically, I'm following these steps:

conda install openjdk conda install pypsark==3.0.0 (I think this is needed, because I have been trying to install it using 3.3 and it didn't worked) conda install -c conda-forge pyspark

Also I installed sagemaker_pyspark. This is a sample code:

from pyspark.sql import SparkSession
import os
import sys

os.environ["SPARK_VERSION"] = r"3.0.0"
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']= "notebook"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = sys.executable

import pydeequ

import sagemaker_pyspark
from pyspark.sql import SparkSession, Row
from pydeequ.analyzers import *


classpath = ":".join(sagemaker_pyspark.classpath_jars()) # aws-specific jars

spark = (SparkSession
    .builder
    .config("spark.driver.extraClassPath", classpath)
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())


df = spark.read.option("header","true").csv('landing/persistent/chocolate_part_1.csv')

And this worked fine. Also the AnalysisRunner module. However, when I try to do something else like:

from pydeequ.profiles import *

result = ColumnProfilerRunner(spark) \
    .onData(df) \
    .run()

for col, profile in result.profiles.items():
    print(profile)

I get the following error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Input In [9], in <cell line: 3>()
      1 from pydeequ.profiles import *
----> 3 result = ColumnProfilerRunner(spark) \
      4     .onData(df) \
      5     .run()
      7 for col, profile in result.profiles.items():
      8     print(profile)

File ~/opt/anaconda3/envs/ADSDB/lib/python3.9/site-packages/pydeequ/profiles.py:121, in ColumnProfilerRunBuilder.run(self)
    115 def run(self):
    116     """
    117     A method that runs a profile check on the data to obtain a ColumnProfiles class
    118 
    119     :return: A ColumnProfiles result
    120     """
--> 121     run = self._ColumnProfilerRunBuilder.run()
    122     return ColumnProfilesBuilder(self._spark_session)._columnProfilesFromColumnRunBuilderRun(run)

File ~/opt/anaconda3/envs/ADSDB/lib/python3.9/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File ~/opt/anaconda3/envs/ADSDB/lib/python3.9/site-packages/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
    188 def deco(*a: Any, **kw: Any) -> Any:
    189     try:
--> 190         return f(*a, **kw)
    191     except Py4JJavaError as e:
    192         converted = convert_exception(e.java_exception)

File ~/opt/anaconda3/envs/ADSDB/lib/python3.9/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o80.run.
: java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction.toAggregateExpression(boolean)'
	at org.apache.spark.sql.DeequFunctions$.withAggregateFunction(DeequFunctions.scala:31)
	at org.apache.spark.sql.DeequFunctions$.stateful_approx_count_distinct(DeequFunctions.scala:60)
	at com.amazon.deequ.analyzers.ApproxCountDistinct.aggregationFunctions(ApproxCountDistinct.scala:52)
	at com.amazon.deequ.analyzers.runners.AnalysisRunner$.$anonfun$runScanningAnalyzers$3(AnalysisRunner.scala:319)
	at scala.collection.immutable.List.flatMap(List.scala:366)
	at com.amazon.deequ.analyzers.runners.AnalysisRunner$.liftedTree1$1(AnalysisRunner.scala:319)
	at com.amazon.deequ.analyzers.runners.AnalysisRunner$.runScanningAnalyzers(AnalysisRunner.scala:318)
	at com.amazon.deequ.analyzers.runners.AnalysisRunner$.doAnalysisRun(AnalysisRunner.scala:167)
	at com.amazon.deequ.analyzers.runners.AnalysisRunBuilder.run(AnalysisRunBuilder.scala:110)
	at com.amazon.deequ.profiles.ColumnProfiler$.profile(ColumnProfiler.scala:141)
	at com.amazon.deequ.profiles.ColumnProfilerRunner.run(ColumnProfilerRunner.scala:72)
	at com.amazon.deequ.profiles.ColumnProfilerRunBuilder.run(ColumnProfilerRunBuilder.scala:185)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)

Anything I'm doing wrong here?

norhther avatar Sep 17 '22 14:09 norhther

@norhther could you try the latest commit in PyDeequ - it's not being released yet, but you could install via pip install git+https://github.com/awslabs/python-deequ.git@7ec9f6f72839779f370a4753c0db26d6cf052203

Most notably, you could use much modern dependencies thanks to https://github.com/awslabs/python-deequ/pull/100

chenliu0831 avatar Sep 23 '22 03:09 chenliu0831

@chenliu0831 I got the same error with this commit. Is there anything else I could try?

norhther avatar Sep 23 '22 10:09 norhther