deequ icon indicating copy to clipboard operation
deequ copied to clipboard

BatchNormal anomaly detector throws "java.lang.IncompatibleClassChangeError: Implementing class"

Open Blackbaud-PatrickMcDonald opened this issue 3 years ago • 4 comments

Environment: Databricks Spark 3.0.1 Scala 2.12.10 Java 1.8.0_282 deequ:1.2.2-spark-3.0 pydeequ 1.0.1

I'm attempting to use the BatchNormal anomaly detector strategy and it is throwing the following exception:

Py4JJavaError: An error occurred while calling o405.run.
: java.lang.IncompatibleClassChangeError: Implementing class
	at com.amazon.deequ.anomalydetection.BatchNormalStrategy.detect(BatchNormalStrategy.scala:75)
	at com.amazon.deequ.anomalydetection.AnomalyDetector.detectAnomaliesInHistory(AnomalyDetector.scala:98)
	at com.amazon.deequ.anomalydetection.AnomalyDetector.isNewPointAnomalous(AnomalyDetector.scala:60)
	at com.amazon.deequ.checks.Check$.isNewestPointNonAnomalous(Check.scala:1138)
	at com.amazon.deequ.checks.Check.$anonfun$isNewestPointNonAnomalous$1(Check.scala:433)
	at scala.runtime.java8.JFunction1$mcZD$sp.apply(JFunction1$mcZD$sp.java:23)
	at com.amazon.deequ.constraints.AnalysisBasedConstraint.runAssertion(AnalysisBasedConstraint.scala:108)
	at com.amazon.deequ.constraints.AnalysisBasedConstraint.pickValueAndAssert(AnalysisBasedConstraint.scala:74)
	at com.amazon.deequ.constraints.AnalysisBasedConstraint.$anonfun$evaluate$2(AnalysisBasedConstraint.scala:60)
	at scala.Option.map(Option.scala:230)
	at com.amazon.deequ.constraints.AnalysisBasedConstraint.evaluate(AnalysisBasedConstraint.scala:60)
	at com.amazon.deequ.constraints.ConstraintDecorator.evaluate(Constraint.scala:56)
	at com.amazon.deequ.checks.Check.$anonfun$evaluate$1(Check.scala:1038)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at com.amazon.deequ.checks.Check.evaluate(Check.scala:1038)
	at com.amazon.deequ.VerificationSuite.$anonfun$evaluate$1(VerificationSuite.scala:269)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at com.amazon.deequ.VerificationSuite.evaluate(VerificationSuite.scala:269)
	at com.amazon.deequ.VerificationSuite.doVerificationRun(VerificationSuite.scala:132)
	at com.amazon.deequ.VerificationRunBuilder.run(VerificationRunBuilder.scala:173)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:295)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:251)
	at java.lang.Thread.run(Thread.java:748)

To reproduce:

from pyspark.sql import SparkSession, Row
from pydeequ.repository import *
from pydeequ.verification import *
from pydeequ.anomaly_detection import *
from pydeequ.analyzers import *


spark = (SparkSession
    .builder
    .getOrCreate())
sc = spark.sparkContext

metricsRepositoryPath = "dbfs:/path/to/my/metrics.json"
metricsRepository = FileSystemMetricsRepository(spark, metricsRepositoryPath)

todaysDataset = sc.parallelize([
            Row(a=80,b=5,),
            Row(a=3,  b=30,),
            Row(a=10, b=5,)]).toDF()

currTime = ResultKey.current_milli_time()
todaysKey = ResultKey(spark, currTime)

currResult = VerificationSuite(spark).onData(todaysDataset) \
    .useRepository(metricsRepository) \
    .addAnomalyCheck(BatchNormalStrategy(), Sum("a")) \
    .run()

Is this a compatibility issue with the version of Breeze being used?

Hi, I'm seeing a similar issue with this anomaly check as well on EMR 6.40 (spark 3.1.2) running deequ 2.0.0. This is the exception I get:

java.lang.NoClassDefFoundError: breeze/stats/package$
  at com.amazon.deequ.anomalydetection.BatchNormalStrategy.detect(BatchNormalStrategy.scala:75)
  at com.amazon.deequ.anomalydetection.AnomalyDetector.detectAnomaliesInHistory(AnomalyDetector.scala:98)
  at com.amazon.deequ.anomalydetection.AnomalyDetector.isNewPointAnomalous(AnomalyDetector.scala:60)
  at com.amazon.deequ.checks.Check$.isNewestPointNonAnomalous(Check.scala:1138)
  at com.amazon.deequ.checks.Check.$anonfun$isNewestPointNonAnomalous$1(Check.scala:433)
  at scala.runtime.java8.JFunction1$mcZD$sp.apply(JFunction1$mcZD$sp.java:23)
  at com.amazon.deequ.constraints.AnalysisBasedConstraint.runAssertion(AnalysisBasedConstraint.scala:108)
  at com.amazon.deequ.constraints.AnalysisBasedConstraint.pickValueAndAssert(AnalysisBasedConstraint.scala:74)
  at com.amazon.deequ.constraints.AnalysisBasedConstraint.$anonfun$evaluate$2(AnalysisBasedConstraint.scala:60)
import com.amazon.deequ.anomalydetection.BatchNormalStrategy

def batchNormalStrategyTest() = {
    val strategy = BatchNormalStrategy(lowerDeviationFactor=Some(3.0), upperDeviationFactor=Some(3.0))
    datasetVerificationRunner(df, {v: VerificationRunBuilderWithRepository => v.addAnomalyCheck(strategy, Compliance("column_1", "cardinality(column_1) > 0"))})
}

batchNormalStrategyTest()

The datasetVerificationRunner function is simply building a VerificationRunBuilderWithRepository and applying the higher order function on it to add an AnomalyCheck (It is structured this way because my intention is to quickly test out different strategies on a number of different instances of the same dataset).

Basically something like

val verificationResult = addAnomalyCheck(VerificationSuite().onData(df.where(s"date='${date}'")).useRepository(repo).saveOrAppendResult(newKey)).run()
val resultDf = checkResultsAsDataFrame(spark, verificationResult)

Is this a known issue with this class or am I using it incorrectly in this case?

sakshaat avatar Nov 05 '21 21:11 sakshaat

For anyone having trouble running it on Databricks - it seems to be a conflict between breeze dependency versions - DBR 7.3 and later use 1.x which conflicts with Deequ's 0.13.2

mfridrikhson-tp avatar Sep 20 '22 11:09 mfridrikhson-tp

Possible to verify if the issue go away with latest mainline?

pip install git+https://github.com/awslabs/python-deequ.git@7ec9f6f72839779f370a4753c0db26d6cf052203

chenliu0831 avatar Sep 21 '22 01:09 chenliu0831

I still have the same error too using databricks DBR 11.3 with spark 3.3.0 and DBR 12.2 with spark 3.3.2. im using pydeequ 1.2 deequ 2.0.4

natanaeldgsantos avatar Dec 26 '23 13:12 natanaeldgsantos