cuallee icon indicating copy to clipboard operation
cuallee copied to clipboard

Multi-column rules

Open pwolff42 opened this issue 8 months ago • 6 comments

Is there current functionality or plans on the roadmap to facilitate multiple column logic in a rule? This can be powerful and is a feature that few other frameworks offer. GreatExpectations has a few examples, but they are rather limited:

https://greatexpectations.io/expectations/expect_column_pair_values_a_to_be_greater_than_b/

https://greatexpectations.io/expectations/expect_column_pair_values_to_be_in_set/

A very powerful extension of the second link would be if the check accepted an arbitrary # of columns, then a hierarchical structure of sorts could be established:

if column A is in [X], column B must be in [Y, Z], if column A is in [X] and column B is in [Y], column C must be in [...]

Is this the kind of rule that could be specified using satisfies, or is that only for a single column? https://canimus.github.io/cuallee/module/check/#cuallee.Check.satisfies

pwolff42 avatar Apr 24 '25 15:04 pwolff42

from cuallee import Check

def hierarchical_rule(df):
    return ((df["A"].isin(["X"])) & (df["B"].isin(["Y", "Z"]))) | \
           ((df["A"].isin(["X"])) & (df["B"].isin(["Y"])) & (df["C"].isin(["..."])))

check = Check()
check.satisfies(hierarchical_rule, columns=["A", "B", "C"], coverage=1.0)

levyvix avatar Apr 25 '25 04:04 levyvix

Hi @levyvix ,

The above does not work for satisfies as neither columns nor coverage are valid arguments. satisfies accepts the argument column which takes a single string.

satisfies accepts a SQL-like predicate argument, perhaps an example of this would be helpful. I'm still not sure there is a way to specify multiple columns for this rule.

Perhaps @canimus you have an example?

pwolff42 avatar Apr 30 '25 18:04 pwolff42

Hi @pwolff42 here you can find a good example of the is_custom rule which will allow the composite rule definition



import inspect
import pyspark.sql.functions as F
import pyspark.sql.types as T
from cuallee import Check
from pyspark.sql import DataFrame, SparkSession
from toolz import curry

spark = SparkSession.builder.getOrCreate()
data = [("A", 1), ("B", -1), ("B", 0), ("C", 2)]
schema = T.StructType([T.StructField("id", T.StringType(), True), T.StructField("quantity", T.IntegerType(), True)])
orders = spark.createDataFrame(data, schema=schema)
orders.show()

check = Check(name="orders_checks")
check = check.add_rule("is_unique", "id", 1)
check = check.add_rule("is_greater_than", "quantity", 0, 0.5)


# Define and add a custom check
@curry
def mean_above_threshold(df: DataFrame, column_name: str, threshold: float) -> DataFrame:
    mean_value = df.select(F.mean(column_name).alias("mean")).collect()[0]["mean"]
    is_above_threshold = mean_value > threshold
    return df.withColumn("mean_above_threshold", F.lit(is_above_threshold))

col_name = "quantity"
check = check.add_rule("is_custom", col_name, mean_above_threshold(column_name=col_name, threshold=0), 1, options={"name" : "mean_above_threshold", "custom_value": f"{col_name}>0"})


# Define a custom check function for data type validation
@curry
def is_correct_dtype(df: DataFrame, column_name: str, expected_dtype: T.DataType) -> DataFrame:
    actual_dtype = [field.dataType for field in df.schema.fields if field.name == column_name][0]
    is_dtype_correct = actual_dtype == expected_dtype
    return df.withColumn(f"{column_name}_is_dtype_correct", F.lit(is_dtype_correct))
check = check.add_rule("is_custom", "id", is_correct_dtype(column_name="id", expected_dtype=T.StringType()), 1, options={"name" : "is_correct_dtype", "custom_value": "string"})
check = check.add_rule("is_custom", "quantity", is_correct_dtype(column_name="quantity", expected_dtype=T.IntegerType()), 1, options={"name" : "is_correct_dtype", "custom_value": "integer"})

# Run the checks
output = check.validate(orders)
output.show()

# Verbose alternative to `f(x)?`
#func = check.rules[-1].value
#print(f"{func.__name__}{inspect.signature(func)}")

canimus avatar Apr 30 '25 21:04 canimus

Hi @canimus, thanks for the quick response. Correct in assuming then that these rules (satisfies, is_custom) are not dataframe agnostic?

pwolff42 avatar May 01 '25 15:05 pwolff42

Hi @pwolff42 I am confident that satisfies is covered 100% across all dataframes. However, I think the is_custom is not across all implementations, I remember from the issues closed, and the conversations that is relatively new, and it was included in pandas lately, but I am afraid, that it may not cover all apis. Fancy a PR?

canimus avatar May 01 '25 15:05 canimus

@pwolff42 You can use satisfies to compare multiple columns with each other.

The argument column is misleading in this case. It doesn't actually limit the predicate to applying to this column. As far as I can tell it's used solely for reporting purposes.

An example showing a simple comparison and a more complex one

Data:

import polars as pl

df_users = pl.DataFrame({
        "user_id": [1, 2, 3, 4, 4],
        "age": [25, 150, 22, 45, 30],
        "email": ["[email protected]", "[email protected]", "invalid-email", "[email protected]", "[email protected]"],
        "score": [85.5, 92.0, 78.3, 88.7, 95.2],
        "is_active": [True, True, False, True, False],
        "category": ["A", "B", "B", "A", "C"],
    })

Check:

from cuallee import Check, CheckLevel

check = Check(CheckLevel.WARNING, "User Data Validation")

result = (
    check
    .satisfies("arbitrary_value", "age > score")
    .satisfies("this_can_be_anything", 
                """
                (is_active AND category IN ('A', 'B')) 
                OR (NOT is_active AND category IN ('C'))
                """,
                options={"name": "My business Rule"})
    .validate(df_users)
)

print(result)

dmschauer avatar Jul 07 '25 21:07 dmschauer