soda-core icon indicating copy to clipboard operation
soda-core copied to clipboard

Filter not applied when extracting duplicate_count failed_rows

Open migueldoblado opened this issue 8 months ago • 1 comments

Hi,

I created a simple use case to illustrate a situation which can have a big impact in performance when working with big datasets.

Executing a duplicate_count check with a dataset filter, the dataset filter is not applied when joining it with the duplicated values to extract the failed_rows.

Executing the snippet for the example:

#!/usr/bin/env python

from pyspark.sql import SparkSession
from soda.scan import Scan
import json

# Initialize Spark session
spark = SparkSession.builder \
    .appName("SodaScanTest") \
    .getOrCreate()

# Create a Spark DataFrame with temporary data
data = [
    (1, "Alice", 29),
    (2, "Bob", 25),  # Force a duplicate
    (3, "Charlie", 25), # Force a duplicate
    (4, "Diana", None)  
]
columns = ["id", "name", "age"]

df = spark.createDataFrame(data, columns)
df.show()

# Register the DataFrame as a temporary table
df.createOrReplaceTempView("people")

# Configure Soda Scan
scan = Scan()
scan.set_scan_definition_name("soda_scan_test")
scan.set_data_source_name("spark_df")
scan.add_spark_session(spark)
scan.set_verbose(True)

# Add a missing_count check
scan.add_sodacl_yaml_str("""                         
filter people [young]:
  where: age <= 25
                                                  
checks for people [young]:
  - duplicate_count(age) = 0
""")

# Execute the scan
scan.execute()

# Print the scan results
if scan.has_check_fails():
    print(scan.get_logs_text())
    print("Scan failed!")
else:
    print("Scan succeeded!")

# Stop the Spark session
spark.stop()

we can see part of the output:

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+---+-------+----+                                                              
| id|   name| age|
+---+-------+----+
|  1|  Alice|  29|
|  2|    Bob|  25|
|  3|Charlie|  25|
|  4|  Diana|NULL|
+---+-------+----+

INFO   | Soda Core 3.5.1
DEBUG  | Scan execution starts
DEBUG  | Query 1.spark_df.people[young].age.duplicate_count:

          WITH frequencies AS (
              SELECT COUNT(*) AS frequency
              FROM people
              WHERE age IS NOT NULL 
AND age <= 25
              GROUP BY age)
          SELECT COUNT(*)
          FROM frequencies
          WHERE frequency > 1
DEBUG  | Query 2.spark_df.people[young].age.failed_rows[duplicate_count]:

          WITH frequencies AS (
              SELECT age
              FROM people
              WHERE age IS NOT NULL 
AND age <= 25
              GROUP BY age
              HAVING COUNT(*) > 1)
          SELECT main.*
          FROM people main
          JOIN frequencies ON main.age = frequencies.age

LIMIT 100
DEBUG  | Query 2.spark_df.people[young].age.failed_rows[duplicate_count]:

          WITH frequencies AS (
              SELECT age
              FROM people
              WHERE age IS NOT NULL 
AND age <= 25
              GROUP BY age
              HAVING COUNT(*) > 1)
          SELECT main.*
          FROM people main
          JOIN frequencies ON main.age = frequencies.age

LIMIT 100
INFO   | Using DefaultSampler
INFO   | Scan summary:
DEBUG  | 2/2 queries OK
DEBUG  |   1.spark_df.people[young].age.duplicate_count [OK] 0:00:01.238728
DEBUG  |   2.spark_df.people[young].age.failed_rows[duplicate_count] [OK] 0:00:03.708356
INFO   | 1/1 check FAILED: 
INFO   |     people [young] in spark_df
INFO   |       duplicate_count(age) = 0 [sodacl_string.yml] [FAILED]
INFO   |         check_value: 1
INFO   | Oops! 1 failures. 0 warnings. 0 errors. 0 pass.
Scan failed!

Is there any limitation so it can not create the following query to optimize the joined table?

DEBUG  | Query 2.spark_df.people[young].age.failed_rows[duplicate_count]:
          WITH frequencies AS (
              SELECT age
              FROM people
              WHERE age IS NOT NULL 
AND age <= 25
              GROUP BY age
              HAVING COUNT(*) > 1)
          SELECT main.*
          FROM people main
          JOIN frequencies ON main.age = frequencies.age
         WHERE age <=25

migueldoblado avatar Mar 24 '25 21:03 migueldoblado

CLOUD-9182

tools-soda avatar Mar 24 '25 21:03 tools-soda