datacompy icon indicating copy to clipboard operation
datacompy copied to clipboard

Use "<=>" operator to join spark dataframes

Open runrig opened this issue 2 years ago • 4 comments

If there are nulls in the join columns of dataframes, they do not pass the join condition and so are not counted as common rows, and instead counted as unique to each dataframe. If the "<=>" operator is used instead of "=" in these statements, null values will be joined: join_condition = " AND ".join( ["A." + name + "=B." + name for name in self._join_column_names]

This will make the spark compare behave more like the pandas compare, as null values join there. Note that "<=>" only works in later versions of Spark, so maybe it should be an option, or just make the change and require at least the necessary version (at least spark 1.6 according to the first answer in this: https://stackoverflow.com/questions/41728762/including-null-values-in-an-apache-spark-join ).

Originally posted by @runrig in https://github.com/capitalone/datacompy/discussions/94#discussioncomment-2694968

runrig avatar May 06 '22 14:05 runrig

Duplicate of #146

fdosani avatar May 06 '22 15:05 fdosani

I'm ok adding this in. I think maybe having a flag to preserve the original functionality and also this new functionality. I think we should preserve the inherent difference of Spark and Pandas. Also before tackling this I think it might make sense to refactor the Spark version of the code align with the Pandas version. I know in newer Spark version there is alignment to using Koalas which is closer to Pandas.

Need to dive a bit more into this, but unification would be nice. Links back to a long standing issue: #13

fdosani avatar May 09 '22 16:05 fdosani

@runrig small favour to ask. Could you provide a small working example just so I can recreate something on your side and also use for testing purposes?

fdosani avatar May 09 '22 19:05 fdosani

Example:

cols = ["id", "name", "salary", "Address"]

df1 = spark.createDataFrame([
  [1, "ABC", 5000, "US"],
  [2, "DEF", 4000, "UK"],
  [3, "GHI", 3000, "JPN"],
  [4, "JKL", 4500, "CHN"],
  [None, "MNO", 42, "FOO"]
], cols)

df2 = spark.createDataFrame([
  [1, "ABC", 5000, "US"],
  [2, "DEF", 4000, "CAN"],
  [3, "GHI", 3500, "JPN"],
  [4, "JKL_M", 4800, "CHN"],
  [None, "MNO", 42, "FOO"]
], cols)

# spark compare:
compare = datacompy.SparkCompare(
    spark,
    df1,
    df2,
    join_columns=['id'],
    abs_tol=0,
    rel_tol=0
)
compare.report()

# contrast with pandas compare:
p_compare= datacompy.Compare(
    df1.toPandas(),
    df2.toPandas(),
    join_columns=['id']
)
print(p_compare.report())

runrig avatar May 09 '22 21:05 runrig

Sorry, I haven't checked in for a while, but were NullSafe comparisons ever incorporated into a release version? Or is it part of other changes not yet merged? Thanks.

runrig avatar Feb 01 '23 19:02 runrig

@runrig it hasn't been merged yet. Sorry its just something which I need to circle back around. It's currently sitting on the spark refactor branch. I haven't had much time to do performance testing on that specifically.

fdosani avatar Feb 01 '23 19:02 fdosani