deequ
deequ copied to clipboard
BatchNormal anomaly detector throws "java.lang.IncompatibleClassChangeError: Implementing class"
Environment: Databricks Spark 3.0.1 Scala 2.12.10 Java 1.8.0_282 deequ:1.2.2-spark-3.0 pydeequ 1.0.1
I'm attempting to use the BatchNormal anomaly detector strategy and it is throwing the following exception:
Py4JJavaError: An error occurred while calling o405.run.
: java.lang.IncompatibleClassChangeError: Implementing class
at com.amazon.deequ.anomalydetection.BatchNormalStrategy.detect(BatchNormalStrategy.scala:75)
at com.amazon.deequ.anomalydetection.AnomalyDetector.detectAnomaliesInHistory(AnomalyDetector.scala:98)
at com.amazon.deequ.anomalydetection.AnomalyDetector.isNewPointAnomalous(AnomalyDetector.scala:60)
at com.amazon.deequ.checks.Check$.isNewestPointNonAnomalous(Check.scala:1138)
at com.amazon.deequ.checks.Check.$anonfun$isNewestPointNonAnomalous$1(Check.scala:433)
at scala.runtime.java8.JFunction1$mcZD$sp.apply(JFunction1$mcZD$sp.java:23)
at com.amazon.deequ.constraints.AnalysisBasedConstraint.runAssertion(AnalysisBasedConstraint.scala:108)
at com.amazon.deequ.constraints.AnalysisBasedConstraint.pickValueAndAssert(AnalysisBasedConstraint.scala:74)
at com.amazon.deequ.constraints.AnalysisBasedConstraint.$anonfun$evaluate$2(AnalysisBasedConstraint.scala:60)
at scala.Option.map(Option.scala:230)
at com.amazon.deequ.constraints.AnalysisBasedConstraint.evaluate(AnalysisBasedConstraint.scala:60)
at com.amazon.deequ.constraints.ConstraintDecorator.evaluate(Constraint.scala:56)
at com.amazon.deequ.checks.Check.$anonfun$evaluate$1(Check.scala:1038)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at com.amazon.deequ.checks.Check.evaluate(Check.scala:1038)
at com.amazon.deequ.VerificationSuite.$anonfun$evaluate$1(VerificationSuite.scala:269)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at com.amazon.deequ.VerificationSuite.evaluate(VerificationSuite.scala:269)
at com.amazon.deequ.VerificationSuite.doVerificationRun(VerificationSuite.scala:132)
at com.amazon.deequ.VerificationRunBuilder.run(VerificationRunBuilder.scala:173)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
To reproduce:
from pyspark.sql import SparkSession, Row
from pydeequ.repository import *
from pydeequ.verification import *
from pydeequ.anomaly_detection import *
from pydeequ.analyzers import *
spark = (SparkSession
.builder
.getOrCreate())
sc = spark.sparkContext
metricsRepositoryPath = "dbfs:/path/to/my/metrics.json"
metricsRepository = FileSystemMetricsRepository(spark, metricsRepositoryPath)
todaysDataset = sc.parallelize([
Row(a=80,b=5,),
Row(a=3, b=30,),
Row(a=10, b=5,)]).toDF()
currTime = ResultKey.current_milli_time()
todaysKey = ResultKey(spark, currTime)
currResult = VerificationSuite(spark).onData(todaysDataset) \
.useRepository(metricsRepository) \
.addAnomalyCheck(BatchNormalStrategy(), Sum("a")) \
.run()
Is this a compatibility issue with the version of Breeze being used?
Hi, I'm seeing a similar issue with this anomaly check as well on EMR 6.40 (spark 3.1.2) running deequ 2.0.0. This is the exception I get:
java.lang.NoClassDefFoundError: breeze/stats/package$
at com.amazon.deequ.anomalydetection.BatchNormalStrategy.detect(BatchNormalStrategy.scala:75)
at com.amazon.deequ.anomalydetection.AnomalyDetector.detectAnomaliesInHistory(AnomalyDetector.scala:98)
at com.amazon.deequ.anomalydetection.AnomalyDetector.isNewPointAnomalous(AnomalyDetector.scala:60)
at com.amazon.deequ.checks.Check$.isNewestPointNonAnomalous(Check.scala:1138)
at com.amazon.deequ.checks.Check.$anonfun$isNewestPointNonAnomalous$1(Check.scala:433)
at scala.runtime.java8.JFunction1$mcZD$sp.apply(JFunction1$mcZD$sp.java:23)
at com.amazon.deequ.constraints.AnalysisBasedConstraint.runAssertion(AnalysisBasedConstraint.scala:108)
at com.amazon.deequ.constraints.AnalysisBasedConstraint.pickValueAndAssert(AnalysisBasedConstraint.scala:74)
at com.amazon.deequ.constraints.AnalysisBasedConstraint.$anonfun$evaluate$2(AnalysisBasedConstraint.scala:60)
import com.amazon.deequ.anomalydetection.BatchNormalStrategy
def batchNormalStrategyTest() = {
val strategy = BatchNormalStrategy(lowerDeviationFactor=Some(3.0), upperDeviationFactor=Some(3.0))
datasetVerificationRunner(df, {v: VerificationRunBuilderWithRepository => v.addAnomalyCheck(strategy, Compliance("column_1", "cardinality(column_1) > 0"))})
}
batchNormalStrategyTest()
The datasetVerificationRunner
function is simply building a VerificationRunBuilderWithRepository
and applying the higher order function on it to add an AnomalyCheck (It is structured this way because my intention is to quickly test out different strategies on a number of different instances of the same dataset).
Basically something like
val verificationResult = addAnomalyCheck(VerificationSuite().onData(df.where(s"date='${date}'")).useRepository(repo).saveOrAppendResult(newKey)).run()
val resultDf = checkResultsAsDataFrame(spark, verificationResult)
Is this a known issue with this class or am I using it incorrectly in this case?
For anyone having trouble running it on Databricks - it seems to be a conflict between breeze
dependency versions - DBR 7.3 and later use 1.x which conflicts with Deequ's 0.13.2
Possible to verify if the issue go away with latest mainline?
pip install git+https://github.com/awslabs/python-deequ.git@7ec9f6f72839779f370a4753c0db26d6cf052203
I still have the same error too using databricks DBR 11.3 with spark 3.3.0 and DBR 12.2 with spark 3.3.2. im using pydeequ 1.2 deequ 2.0.4