pandera
pandera copied to clipboard
lazy validation on dask dataframes should collect all errors
Describe the bug
When doing lazy validation on dask dataframes, currently only the first partition to raise a SchemaErrors
exception is reported.
- [X] I have checked that this issue has not already been reported.
- [X] I have confirmed this bug exists on the latest version of pandera.
- [X] (optional) I have confirmed this bug exists on the master branch of pandera.
Code Sample, a copy-pastable example
import pandera as pa
import dask.dataframe as dd
import pandas as pd
int_schema = pa.DataFrameSchema({"col": pa.Column(int, checks=pa.Check.gt(0))})
str_schema = pa.DataFrameSchema({"col": pa.Column(str)})
df = pd.DataFrame({"col": [-x for x in range(10)]})
ddf = dd.from_pandas(df, npartitions=5)
ddf = int_schema.validate(ddf, lazy=True)
val_df = ddf.compute()
print(val_df)
stdout:
pandera.errors.SchemaErrors: A total of 1 schema errors were found.
Error Counts
------------
- schema_component_check: 1
Schema Error Summary
--------------------
failure_cases n_failure_cases
schema_context column check
Column col greater_than(0) [-8, -9] 2
Expected behavior
The semantics for lazy=True
on dask dataframes should be such that all errors are collected across partitions, such that all failure cases are reported in the SchemaErrors
object.
The solution should implement a dask validation function passed into map_partitions
here:
https://github.com/pandera-dev/pandera/blob/master/pandera/schemas.py#L483-L501
such that the schema errors across all partitions are aggregated into a single SchemaErrors
exception.
Hi. Great work on the package! We have run in to a similar issue recently.
Currently, we do something similar to below as a workaround
def validate_partition(schema, partition):
"""
Runs validation on a dask partition
Args:
partition (dd.DataFrame) partition for dask dataframe
"""
report = pd.DataFrame()
try:
schema.validate(partition, lazy=True)
except pa.errors.SchemaErrors as err:
report = err.failure_cases
return report
ddf = dd.from_pandas(df, npartitions=num_cores)
validated_ddf = ddf.map_partitions(validate_partition)
report = validated_ddf.compute()
There are some issues with this approach when it relates to dataframe schema validation that runs checks across two columns or unique checks for a column due to partitions.
Are there potential workarounds that you are aware of or can suggest beyond what we have above?
Hi @Naveen-Annam-CMA, so if I understand correctly, validated_ddf
is actually the failure cases collected across all partitions, not the validated data, correct?
Hi @Naveen-Annam-CMA, so if I understand correctly,
validated_ddf
is actually the failure cases collected across all partitions, not the validated data, correct?
Yes, thats correct as our focus is on collecting all the errors whilst utilizing the power of dask. But there is problem with this approach as mentioned previously. The independent validation means you end up with duplicate errors, incomplete validation for dataframeschema validation and unique checks are not reflective of the whole column rather just the partition. We had to separate out the checks which can be fully independent. i.e row level checks