pandera icon indicating copy to clipboard operation
pandera copied to clipboard

lazy validation on dask dataframes should collect all errors

Open cosmicBboy opened this issue 2 years ago • 3 comments

Describe the bug

When doing lazy validation on dask dataframes, currently only the first partition to raise a SchemaErrors exception is reported.

  • [X] I have checked that this issue has not already been reported.
  • [X] I have confirmed this bug exists on the latest version of pandera.
  • [X] (optional) I have confirmed this bug exists on the master branch of pandera.

Code Sample, a copy-pastable example

import pandera as pa
import dask.dataframe as dd
import pandas as pd


int_schema = pa.DataFrameSchema({"col": pa.Column(int, checks=pa.Check.gt(0))})
str_schema = pa.DataFrameSchema({"col": pa.Column(str)})

df = pd.DataFrame({"col": [-x for x in range(10)]})
ddf = dd.from_pandas(df, npartitions=5)

ddf = int_schema.validate(ddf, lazy=True)
val_df = ddf.compute()
print(val_df)

stdout:

pandera.errors.SchemaErrors: A total of 1 schema errors were found.

Error Counts
------------
- schema_component_check: 1

Schema Error Summary
--------------------
                                      failure_cases  n_failure_cases
schema_context column check
Column         col    greater_than(0)      [-8, -9]                2

Expected behavior

The semantics for lazy=True on dask dataframes should be such that all errors are collected across partitions, such that all failure cases are reported in the SchemaErrors object.

The solution should implement a dask validation function passed into map_partitions here: https://github.com/pandera-dev/pandera/blob/master/pandera/schemas.py#L483-L501

such that the schema errors across all partitions are aggregated into a single SchemaErrors exception.

cosmicBboy avatar Dec 23 '21 15:12 cosmicBboy

Hi. Great work on the package! We have run in to a similar issue recently.

Currently, we do something similar to below as a workaround

def validate_partition(schema, partition):
    """
    Runs validation on a dask partition

    Args:
        partition (dd.DataFrame) partition for dask dataframe
    """

    report = pd.DataFrame()
    try:
        schema.validate(partition, lazy=True)
    except pa.errors.SchemaErrors as err:
        report = err.failure_cases
    return report

ddf = dd.from_pandas(df, npartitions=num_cores)
validated_ddf = ddf.map_partitions(validate_partition)

report = validated_ddf.compute()

There are some issues with this approach when it relates to dataframe schema validation that runs checks across two columns or unique checks for a column due to partitions.

Are there potential workarounds that you are aware of or can suggest beyond what we have above?

Naveen-Annam-CMA avatar Sep 28 '23 17:09 Naveen-Annam-CMA

Hi @Naveen-Annam-CMA, so if I understand correctly, validated_ddf is actually the failure cases collected across all partitions, not the validated data, correct?

cosmicBboy avatar Sep 28 '23 18:09 cosmicBboy

Hi @Naveen-Annam-CMA, so if I understand correctly, validated_ddf is actually the failure cases collected across all partitions, not the validated data, correct?

Yes, thats correct as our focus is on collecting all the errors whilst utilizing the power of dask. But there is problem with this approach as mentioned previously. The independent validation means you end up with duplicate errors, incomplete validation for dataframeschema validation and unique checks are not reflective of the whole column rather just the partition. We had to separate out the checks which can be fully independent. i.e row level checks

Naveen-Annam-CMA avatar Oct 02 '23 15:10 Naveen-Annam-CMA