fugue
fugue copied to clipboard
[BUG] CROSS Join fails for SparkExecutionEgnine
Minimal Code To Reproduce
df1_s = spark.createDataFrame([[1,2]], schema=StructType([StructField("a", IntegerType()), StructField("b",IntegerType())]))
df1_s = spark.createDataFrame([[1,2]], schema=StructType([StructField("c", IntegerType()), StructField("d",IntegerType())]))
dag= FugueWorkflow()
df1 = dag.df(df1_s)
df2 = dag.df(df1_s)
df2 = df1.join(df2, how='cross')
dag.run(engine='spark')
Describe the bug When trying to perform a cross-join using spark dataframes, I am encountering the following error
_2 _State.RUNNING -> _State.FAILED can't specify on for cross join
Expected behavior When using the 'SparkExecutionEngine', I would expect that a cross join would function in the same way as with local dataframes
dag = FugueWorkflow()
df1 = dag.df([[1,2]],"a:int,b:int")
df2 = dag.df([[3,4]],"c:int,d:int")
df3 = df1.join(df2, how='cross')
dag.run()
Environment (please complete the following information):
- Backend: spark
- Backend version: 3.2.2
- Python version: 3.9.5
- OS: linux
The problem is your code has typos, you defined df1_s
twice and df2
is also from df1_s
This works
df1_s = spark.createDataFrame([[1,2]], schema=StructType([StructField("a", IntegerType()), StructField("b",IntegerType())]))
df2_s = spark.createDataFrame([[1,2]], schema=StructType([StructField("c", IntegerType()), StructField("d",IntegerType())]))
dag= FugueWorkflow()
df1 = dag.df(df1_s)
df2 = dag.df(df2_s)
df2 = df1.join(df2, how='cross')
dag.run(engine='spark')
But the error message is indeed confusing. It means that it automatically detected the common columns because of your typos, but it's not clear to users. We need to fix this.
Also, with Fugue, you really don't need to create spark df with the tedious schema expressions, you can just use pandas dfs
dag = FugueWorkflow()
df1 = dag.df(pd.DataFrame(...))
df2 = dag.df(pd.DataFrame(...))
...
dag.run(spark)
This will also work on spark
dag = FugueWorkflow()
df1 = dag.df([[1,2]],"a:int,b:int")
df2 = dag.df([[3,4]],"c:int,d:int")
df3 = df1.join(df2, how='cross')
dag.run(spark)