soda-core
soda-core copied to clipboard
Filter not applied when extracting duplicate_count failed_rows
Hi,
I created a simple use case to illustrate a situation which can have a big impact in performance when working with big datasets.
Executing a duplicate_count check with a dataset filter, the dataset filter is not applied when joining it with the duplicated values to extract the failed_rows.
Executing the snippet for the example:
#!/usr/bin/env python
from pyspark.sql import SparkSession
from soda.scan import Scan
import json
# Initialize Spark session
spark = SparkSession.builder \
.appName("SodaScanTest") \
.getOrCreate()
# Create a Spark DataFrame with temporary data
data = [
(1, "Alice", 29),
(2, "Bob", 25), # Force a duplicate
(3, "Charlie", 25), # Force a duplicate
(4, "Diana", None)
]
columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)
df.show()
# Register the DataFrame as a temporary table
df.createOrReplaceTempView("people")
# Configure Soda Scan
scan = Scan()
scan.set_scan_definition_name("soda_scan_test")
scan.set_data_source_name("spark_df")
scan.add_spark_session(spark)
scan.set_verbose(True)
# Add a missing_count check
scan.add_sodacl_yaml_str("""
filter people [young]:
where: age <= 25
checks for people [young]:
- duplicate_count(age) = 0
""")
# Execute the scan
scan.execute()
# Print the scan results
if scan.has_check_fails():
print(scan.get_logs_text())
print("Scan failed!")
else:
print("Scan succeeded!")
# Stop the Spark session
spark.stop()
we can see part of the output:
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+---+-------+----+
| id| name| age|
+---+-------+----+
| 1| Alice| 29|
| 2| Bob| 25|
| 3|Charlie| 25|
| 4| Diana|NULL|
+---+-------+----+
INFO | Soda Core 3.5.1
DEBUG | Scan execution starts
DEBUG | Query 1.spark_df.people[young].age.duplicate_count:
WITH frequencies AS (
SELECT COUNT(*) AS frequency
FROM people
WHERE age IS NOT NULL
AND age <= 25
GROUP BY age)
SELECT COUNT(*)
FROM frequencies
WHERE frequency > 1
DEBUG | Query 2.spark_df.people[young].age.failed_rows[duplicate_count]:
WITH frequencies AS (
SELECT age
FROM people
WHERE age IS NOT NULL
AND age <= 25
GROUP BY age
HAVING COUNT(*) > 1)
SELECT main.*
FROM people main
JOIN frequencies ON main.age = frequencies.age
LIMIT 100
DEBUG | Query 2.spark_df.people[young].age.failed_rows[duplicate_count]:
WITH frequencies AS (
SELECT age
FROM people
WHERE age IS NOT NULL
AND age <= 25
GROUP BY age
HAVING COUNT(*) > 1)
SELECT main.*
FROM people main
JOIN frequencies ON main.age = frequencies.age
LIMIT 100
INFO | Using DefaultSampler
INFO | Scan summary:
DEBUG | 2/2 queries OK
DEBUG | 1.spark_df.people[young].age.duplicate_count [OK] 0:00:01.238728
DEBUG | 2.spark_df.people[young].age.failed_rows[duplicate_count] [OK] 0:00:03.708356
INFO | 1/1 check FAILED:
INFO | people [young] in spark_df
INFO | duplicate_count(age) = 0 [sodacl_string.yml] [FAILED]
INFO | check_value: 1
INFO | Oops! 1 failures. 0 warnings. 0 errors. 0 pass.
Scan failed!
Is there any limitation so it can not create the following query to optimize the joined table?
DEBUG | Query 2.spark_df.people[young].age.failed_rows[duplicate_count]:
WITH frequencies AS (
SELECT age
FROM people
WHERE age IS NOT NULL
AND age <= 25
GROUP BY age
HAVING COUNT(*) > 1)
SELECT main.*
FROM people main
JOIN frequencies ON main.age = frequencies.age
WHERE age <=25
CLOUD-9182