pandera icon indicating copy to clipboard operation
pandera copied to clipboard

Implement error report aggregator

Open cosmicBboy opened this issue 3 years ago • 5 comments

Is your feature request related to a problem? Please describe.

Currently, pandera SchemaError and SchemaErrors are geared towards the debugging workflow, which is useful when trying to figure out what's gone wrong one particular dataframe in a pipeline. However, pandera provides no support for more general reporting of schema errors in the context of:

  • a unit or integration test suite.
  • a runtime that executres multiple independent data processing pipelines.

In these contexts, it would be useful to aggregate the results of multiple schema errors into a single human-readable report.

Describe the solution you'd like

  • A module in pandera that handles aggregation of multiple schema error objects into a single machine-readable data structure, e.g. json
  • A module that takes as input the aggregated schema errors and writes an html report

cosmicBboy avatar Feb 26 '21 02:02 cosmicBboy

Would be extremely useful indeed. I'm currently using PandasSchema because it gives me a report, instead of raising exceptions. Raising an exception kind of implies that it is a show stopper, but I just want to know how my data deviates from the schema.

contang0 avatar Jun 10 '21 09:06 contang0

hey @juliuscrn thanks for your input! Just FYI I won't be able to get to this task for some time, and will require some design work to figure out the best UX for this feature.

I just want to know how my data deviates from the schema.

out of curiosity, what does your validation workflow look like? The core model of pandera is that schemas enable type-checking of dataframes, so schema.validate returns the validated dataframe, not a report of errors.

If you want to extract an error report of failure cases using pandera, a helper function like this would give you functionality similar to pandas-schema:

import pandera as pa

def report_errors(dataframe, schema):
    try:
        schema(dataframe, lazy=True)
        return None
    except pa.errors.SchemaErrors as exc:
        return exc.failure_cases  # a dataframe of failure cases

Note that this uses lazy validation to collect all the errors from all the column/index components before raising an exception. The failure_cases attribute contains a dataframe of metadata re: which index/columns in the validated dataframe did not pass changes.

If this turns out to be a common use case it might be worth considering the possibility of exposing this as a function in a utils module, or even a report_errors method in DataFrameSchema.

cosmicBboy avatar Jun 10 '21 14:06 cosmicBboy

@cosmicBboy thanks! that function works like a charm, and I would say it is very much worth it to expose it as a method. The information it provides is already very useful.

Getting UX right is important indeed. One thing that I like having (and which is also not available in pandas-schema) is an id column. For example, in addition to 'index' I would be able to use one of the columns to act as an identifier.

I'll try to think of what else could be useful from a user perspective.

My workflow is the following: I get data from other people, I validate it against the schema and send the validation report back to the people who sent me the data. I rarely care about data types, most of my validations are checking against lists of values.

By the way, is it currently possible to perform validations on multiple columns? I very much miss this feature in pandas-schema. I often find myself in situations where I need to say something about validity of data given some conditions (which are derived from data given in one or more other columns), and for that I currently resort to my own custom functions.

contang0 avatar Jun 11 '21 21:06 contang0

Would be extremely useful indeed. I'm currently using PandasSchema because it gives me a report, instead of raising exceptions. Raising an exception kind of implies that it is a show stopper, but I just want to know how my data deviates from the schema.

I agree PandasSchema give a better error reporting indicating the row and column where the error has occurred. It would be of great value if we could iterate through the errors and format a report based on a user designed error report template.

riosatbms avatar Aug 05 '21 18:08 riosatbms

@riosatbms this is currently possible with lazy validation, see here

The pa.errors.SchemaErrors object has a failure_cases attribute, which itself is a dataframe containing two columns, index and column, which indicates where in the dataframe the error occurred. The failure_case column also indicates the specific error case. You can use this dataframe however you want to format an error report.

That said, I think we could add a method to the schema API like report_errors https://github.com/pandera-dev/pandera/issues/425#issuecomment-858651910 so that you can do failure_cases = schema.report_failure_cases(df)... does that sound useful?

cosmicBboy avatar Aug 05 '21 18:08 cosmicBboy

for folks who stomped on the same thing, this might help a bit,

personally speaking, reporting is a bit arbitrary and have a lot different preferences depending on who is the report consumer, it'll be better defined by the report consumer instead

Code example
import csv

import pandas as pd
import pandera as pa

file_path = "... original file path csv here"
output_folder = "... output folder here"

column_specifications = {
    # ... pandera.Column definitions
}

# Define the validation schema
schema = pa.DataFrameSchema(column_specifications)

check_csv_file(original_file_path=file_path, pandera_schema=schema)


def check_csv_file(original_file_path, pandera_schema):
    df = pd.read_csv(original_file_path)
    try:
        return pandera_schema.validate(df, lazy=True)
    except pa.errors.SchemaErrors as e:
        get_error_report_from_schema_error(
            df=df,
            schema_errors=e,
            csv_file_path=original_file_path,
            report_output_path=output_folder
        )



def get_error_report_from_schema_error(df: pd.DataFrame,
                                       schema_errors: pa.errors.SchemaErrors,
                                       csv_file_path: str, report_output_path: str):
    print("Expected values for the following columns do not match the schema:")
    schema_errors_list = schema_errors.schema_errors
    # generate a file per column
    for column_error in schema_errors_list:
        column_name = column_error.schema.name
        print(f"Column name {column_name}:")
        print(column_error.failure_cases)

        filename = f"{column_name}_error.csv"

        failure_value_list = column_error.failure_cases.failure_case.unique().tolist()
        # get error index from original dataframe

        row_indexes = df.index[df[column_name].isin(failure_value_list)].tolist()
        filter_csv(
            output_file=report_output_path + "/" + filename,
            original_file=csv_file_path,
            row_indexes=row_indexes,
        )

        print(f"Created CSV file: {filename}")
    # Generate the error summary text file
    # list out unique invalid values
    # how many entries have invalid values
    summary_filename = report_output_path + '/summary.txt'
    with open(summary_filename, 'w') as file:
        for column_error in schema_errors_list:
            column_name = column_error.schema.name
            unique_errors = column_error.failure_cases.failure_case.unique().tolist()
            num_errors = len(column_error.failure_cases)
            error_summary = f"{num_errors} rows have errors in column '{column_name}'. "
            error_summary += f"Wrong values: {unique_errors}\n"
            file.write(error_summary)
    print(f"Created error summary file: {summary_filename}")

def filter_csv(original_file: str, output_file: str, row_indexes: list):
    # filter the csv file by error indexes
    with open(original_file, 'r', newline='') as input_file, open(output_file, 'w',
                                                                  newline='') as output_file:
        reader = csv.reader(input_file)
        # not ideal here since csv reader won't be able to get the
        # "real" original data
        writer = csv.writer(output_file, quoting=csv.QUOTE_ALL)

        header = next(reader)
        writer.writerow(header)

        for i, row in enumerate(reader, start=0):
            if i in row_indexes:
                writer.writerow(row)

snowkrebs avatar May 25 '23 18:05 snowkrebs