mack icon indicating copy to clipboard operation
mack copied to clipboard

type_2_scd_generic_upsert does not handle NULL values properly

Open dgcaron opened this issue 2 years ago • 7 comments

The current way the type_2_scd_generic_upsert function checks for changes in a row involves evaluating each column and this does not yield the expected result. An possible perfomance improvement on this matter and cater for NULL values is to add a hash column that is calculated based on the contents of the columns in the table (except the scd2 system columns).

some background: https://datacadamia.com/dit/owb/scd2_hash

besides some code around it the most interesting changes would be an udf to calculate the hash

differential_hash_udf = udf(
        lambda row: calculate_differential_hash(row, attr_col_names), StringType()
    )

addition of the hash column to the update set

updates_df = updates_df.withColumn(
        differential_hash_col_name, differential_hash_udf(struct(*attr_col_names))
    )

stage changes based on the hash of the columns instead of a column by column comparison

 staged_part_1 = (
        updates_df.alias("updates")
        .join(delta_table.toDF().alias("base"), primary_key)
        .where(
            f"base.{is_current_col_name} = true AND base.{differential_hash_col_name} <> updates.{differential_hash_col_name}"
        )
        .selectExpr("NULL as mergeKey", "updates.*")
    )

merge using the hash column

res = (
        delta_table.alias("base")
        .merge(
            source=staged_updates.alias("staged_updates"),
            condition=pyspark.sql.functions.expr(f"base.{primary_key} = mergeKey"),
        )
        .whenMatchedUpdate(
            condition=f"base.{is_current_col_name} = true AND base.{differential_hash_col_name} <> staged_updates.{differential_hash_col_name}",
            set={
                is_current_col_name: "false",
                end_time_col_name: f"staged_updates.{effective_time_col_name}",
            },
        )
        .whenNotMatchedInsert(values=res_thing)
        .execute()
    )

this is a breaking (not backwards compatible) change to what is now in the function so you could consider making this a different function?

dgcaron avatar Aug 08 '23 14:08 dgcaron

this is the other issue mentioned in #120

dgcaron avatar Aug 08 '23 14:08 dgcaron

@dgcaron thank you for flagging this. I am not so familiar with the topic, could you explain the potential performance boost and how this will fix null value issues?

robertkossendey avatar Sep 29 '23 09:09 robertkossendey

i believe the issue arises here https://github.com/MrPowers/mack/blob/main/mack/init.py#L103

 staged_updates_attrs = list(
        map(lambda attr: f"staged_updates.{attr} <> base.{attr}", attr_col_names)
    )

if the base.{attr} value is initially null for the first write, the scd2 isn't being built properly on next writes. this has to do with the way sql and pypark handle null values in comparisons.

the perfomance boost should come from the fact that you join based on a precalculated hash (the hash is also persisted in the scd2 table) on both sides instead on all columns of interest. you don't have to evaluate each column on both sides to check for changes this way.

i'll add a test case that shows the issue in the upcoming days

dgcaron avatar Sep 29 '23 11:09 dgcaron

@dgcaron understood. precalculating a hash column and comparing only that is way less expensive than comparing each column and it would solve the spark null problem, although this might be solvable through eqNullSafe (not sure though)

robertkossendey avatar Sep 29 '23 12:09 robertkossendey

yes, i guess eqNullSafe should solve the null issue too but i am not sure how to implement that properly with an expression string. it would allow for a change that is non-breaking though, if that is the preference, than i can take a look some time soon

dgcaron avatar Sep 29 '23 13:09 dgcaron

@dgcaron Maybe we could do that as a first step. We could still implement a scd2 with hash later :)

robertkossendey avatar Sep 29 '23 14:09 robertkossendey

@dgcaron - just a friendly ping on this one. Would love to get this fix added! Thank you!

MrPowers avatar Oct 16 '23 13:10 MrPowers