fugue
fugue copied to clipboard
[QUESTION] How to use a CoTransformer on data frames with shared non-key columns
I have a function that aims to implement an SCD2 merge on two dataframes.
In my example, I am attempting to merge two dataframes together, using a single column as the key. The transformation should modify rows with a matching key, and insert all rows from the second dataframe.
When I execute this code, the zip
method by default performs an inner join using the columns which are duplicated across the dataframes. This has the effect of dropping rows with missing values in shared columns, which means that the input dataframes to the scd2_merge
are not what is expected.
from typing import Iterable, Dict, Any, List
import pandas as pd
from fugue import DataFrames, FugueWorkflow
df1 = pd.DataFrame({"a": [1, 2, 3], "b": [1, None, 3], "c": [1, 1, 1]})
df2 = pd.DataFrame({"a": [2, 3, 4], "b": [2, 3, None]})
# schema: a:int,b:float,c:int
def scd2_merge(dfs: DataFrames) -> List[List[Any]]:
"""performs an SCD2-type merge.
Any rows in df1 that have a matching key value in df2 will have
their current flag 'c'
set to 0, before the rows in df2 are inserted"""
ix = "a"
df1 = dfs[0].as_pandas()
df2 = dfs[1].as_pandas()
df1.loc[df1[ix].isin(df2[ix]), "c"] = 0
df2["c"] = 1
return pd.concat([df1, df2])
with FugueWorkflow(engine='pandas') as dag:
df1 = dag.df(df1, "a:int,b:float,c:int")
df2 = dag.df(df2, "a:int,b:float")
dag.zip(df1, df2).transform(scd2_merge).show()
Is there a correct way to implement this type of transformation?
You can do
df1.partition_by("a").zip(df2).transform(scd2_merge).show()
or
dag.zip(df1, df2, partition={"by":"a"}).transform(scd2_merge).show()
I also modified your code a little bit to follow good practices:
import pandas as pd
from fugue import FugueWorkflow
_df1 = pd.DataFrame({"a": [1, 2, 3], "b": [1, None, 3], "c": [1, 1, 1]})
_df2 = pd.DataFrame({"a": [2, 3, 4], "b": [2, 3, None]})
# schema: a:int,b:float,c:int
def scd2_merge(df1:pd.DataFrame, df2:pd.DataFrame) -> pd.DataFrame:
"""performs an SCD2-type merge.
Any rows in df1 that have a matching key value in df2 will have
their current flag 'c'
set to 0, before the rows in df2 are inserted"""
ix = "a"
df1 = df1.assign(c=~df1[ix].isin(df2[ix]))
df2 = df2.assign(c=1)
return pd.concat([df1, df2])
dag = FugueWorkflow()
df1 = dag.df(_df1)
df2 = dag.df(_df2)
df1.partition_by("a").zip(df2).transform(scd2_merge).show()
dag.run()
- Reduced the dependency on Fugue, the input of the
scd_merge
can all be native types - Removed the with statement, remove the predefined engine on with statement. DAG definition doesn't require context manager,
dag,run()
should separate - Removed unnecessary schemas when you do
dag.df
- Use
assign
to avoid mutating the dataframe (pandas good practice) - Renamed
df*
to_df*
(python good practice)
HI @goodwanghan, thanks for the suggestions - I've been able to modify my code as you've suggested to get it to work using the partition_by.zip
syntax. For my particular use-case, I needed to use
.zip(df2, how="full_outer")
to ensure that I had non-intersecting keys in the output.
Also, I am intending to run this in production using the SparkExecutionEngine
. Is there anything about this workflow I should be aware of that could affect the performance?
Wonderful! When you test on spark, don't use SparkExecutionEngine
, you should just use the spark session:
dag.run(spark_session)
spark.sql.shuffle.partitions
should be set properly (this applies to general spark execution)
You can enable fugue.spark.use_pandas_udf
to see if it can be faster, I think for your case it may not have an effect.