fugue icon indicating copy to clipboard operation
fugue copied to clipboard

[BUG] CROSS Join fails for SparkExecutionEgnine

Open jstammers opened this issue 2 years ago • 2 comments

Minimal Code To Reproduce


df1_s = spark.createDataFrame([[1,2]], schema=StructType([StructField("a", IntegerType()), StructField("b",IntegerType())]))
df1_s = spark.createDataFrame([[1,2]], schema=StructType([StructField("c", IntegerType()), StructField("d",IntegerType())]))

dag= FugueWorkflow()
df1 = dag.df(df1_s)
df2 = dag.df(df1_s)
df2 = df1.join(df2, how='cross')
dag.run(engine='spark')

Describe the bug When trying to perform a cross-join using spark dataframes, I am encountering the following error

_2 _State.RUNNING -> _State.FAILED  can't specify on for cross join

Expected behavior When using the 'SparkExecutionEngine', I would expect that a cross join would function in the same way as with local dataframes

dag = FugueWorkflow()
df1 = dag.df([[1,2]],"a:int,b:int")
df2 = dag.df([[3,4]],"c:int,d:int")
df3 = df1.join(df2, how='cross')
dag.run()

Environment (please complete the following information):

  • Backend: spark
  • Backend version: 3.2.2
  • Python version: 3.9.5
  • OS: linux

jstammers avatar Sep 21 '22 12:09 jstammers

The problem is your code has typos, you defined df1_s twice and df2 is also from df1_s

This works

df1_s = spark.createDataFrame([[1,2]], schema=StructType([StructField("a", IntegerType()), StructField("b",IntegerType())]))
df2_s = spark.createDataFrame([[1,2]], schema=StructType([StructField("c", IntegerType()), StructField("d",IntegerType())]))

dag= FugueWorkflow()
df1 = dag.df(df1_s)
df2 = dag.df(df2_s)
df2 = df1.join(df2, how='cross')
dag.run(engine='spark')

But the error message is indeed confusing. It means that it automatically detected the common columns because of your typos, but it's not clear to users. We need to fix this.

goodwanghan avatar Sep 22 '22 07:09 goodwanghan

Also, with Fugue, you really don't need to create spark df with the tedious schema expressions, you can just use pandas dfs

dag = FugueWorkflow()
df1 = dag.df(pd.DataFrame(...))
df2 = dag.df(pd.DataFrame(...))
...

dag.run(spark)

This will also work on spark

dag = FugueWorkflow()
df1 = dag.df([[1,2]],"a:int,b:int")
df2 = dag.df([[3,4]],"c:int,d:int")
df3 = df1.join(df2, how='cross')
dag.run(spark)

goodwanghan avatar Sep 22 '22 07:09 goodwanghan