spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-24497][SQL] Support recursive SQL

Open peter-toth opened this issue 2 years ago • 16 comments

What changes were proposed in this pull request?

This PR adds recursive query feature to Spark SQL.

A recursive query is defined using the WITH RECURSIVE keywords and referring the name of the common table expression within the query. The implementation complies with SQL standard and follows similar rules to other relational databases:

  • A query is made of an anchor followed by a recursive term.
  • The anchor terms doesn't contain self reference and it is used to initialize the query.
  • The recursive term contains a self reference and it is used to expand the current set of rows with new ones.
  • The anchor and recursive terms must be joined with each other by UNION or UNION ALL operators.
  • New rows can only be derived from the newly added rows of the previous iteration (or from the initial set of rows of anchor term). This limitation implies that recursive references can't be used with some of the joins, aggregations or subqueries.

Please see cte-recursive.sql for some examples.

The implemetation has the same limiation that SPARK-36447 / https://github.com/apache/spark/pull/33671 has:

With-CTEs mixed with SQL commands or DMLs will still go through the old inline code path because of our non-standard language specs and not-unified command/DML interfaces.

which means that recursive queries are not supported in SQL commands and DMLs. With https://github.com/apache/spark/pull/42036 this restriction is lifted and a recursive CTE only doesn't work when the CTE is force inlined (spark.sql.legacy.inlineCTEInCommands=true or the command is a multi-insert statement).

Why are the changes needed?

Recursive query is an ANSI SQL feature that is useful to process hierarchical data.

Does this PR introduce any user-facing change?

Yes, adds recursive query feature.

How was this patch tested?

Added new UTs and tests in cte-recursion.sql.

peter-toth avatar Apr 11 '23 17:04 peter-toth

This PR is WIP as it contains https://github.com/apache/spark/pull/40856. Once that PR is merged I will rebase and remove the WIP flag.

peter-toth avatar Apr 20 '23 08:04 peter-toth

https://github.com/apache/spark/pull/40856 got merged and I've rebased this PR. I'm removing the WIP flag and the PR is ready for review.

cc @cloud-fan, @wangyum, @maryannxue, @sigmod

peter-toth avatar Apr 27 '23 11:04 peter-toth

Thanks @peter-toth. I tested this patch locally. But it seem it throws StackOverflowError. How to reproduce:

./dev/make-distribution.sh --tgz  -Phive -Phive-thriftserver
tar -zxf spark-3.5.0-SNAPSHOT-bin-3.3.5.tgz
cd spark-3.5.0-SNAPSHOT-bin-3.3.5
bin/spark-sql
spark-sql (default)> WITH RECURSIVE t(n) AS (
                   >     VALUES (1)
                   > UNION ALL
                   >     SELECT n+1 FROM t WHERE n < 100
                   > )
                   > SELECT sum(n) FROM t;
23/05/30 13:21:21 ERROR Executor: Exception in task 0.0 in stage 265.0 (TID 199)
java.lang.StackOverflowError
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

wangyum avatar May 30 '23 05:05 wangyum

Thanks @peter-toth. I tested this patch locally. But it seem it throws StackOverflowError. How to reproduce:

./dev/make-distribution.sh --tgz  -Phive -Phive-thriftserver
tar -zxf spark-3.5.0-SNAPSHOT-bin-3.3.5.tgz
cd spark-3.5.0-SNAPSHOT-bin-3.3.5
bin/spark-sql
spark-sql (default)> WITH RECURSIVE t(n) AS (
                   >     VALUES (1)
                   > UNION ALL
                   >     SELECT n+1 FROM t WHERE n < 100
                   > )
                   > SELECT sum(n) FROM t;
23/05/30 13:21:21 ERROR Executor: Exception in task 0.0 in stage 265.0 (TID 199)
java.lang.StackOverflowError
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

Thanks for testing this PR @wangyum. Iterestingly, I didn't encounter stack overflow when recursion level is <100. The error starts to appear at level ~170 in my tests. I guess this depends on your default stack size. Since recursion works in a way that each iteration depends on the previous iteration, the RDD lineage of the tasks are getting bigger and bigger and the deserialization of those tasks can throw stack overflow error at some point. Let me amend this PR with adding optional checkpointing so as to truncate RDD linage and be able to deal with deeper recursion...

peter-toth avatar May 30 '23 15:05 peter-toth

Hey folks, So glad to see this feature is being worked on. Do you have any estimates when this could be released ?

ksn06 avatar Jul 06 '23 07:07 ksn06

Hey folks, So glad to see this feature is being worked on. Do you have any estimates when this could be released ?

This feature very likely won't make it into the next release (Spark 3.5) as tbe branch cut is in 2 weeks. But I will try to add it to the one after next (Spark 4.0).

peter-toth avatar Jul 06 '23 08:07 peter-toth

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

github-actions[bot] avatar Oct 15 '23 00:10 github-actions[bot]

@peter-toth thank you so much for sticking with this over three major versions and three separate pull requests. Recursive queries would be really nice to have in Spark SQL.

milimetric avatar Oct 24 '23 20:10 milimetric

@peter-toth Hi, we are very much expecting a recursive sql. We hope you will be able to complete this pull request :)

KamilKandzia avatar Dec 10 '23 12:12 KamilKandzia

@milastdbx do you think you can take over this PR?

cc @cloud-fan, @mitkedb, @MaxGekk

peter-toth avatar Dec 21 '23 11:12 peter-toth

Yes, thank you.

Milan

On Thu, Dec 21, 2023 at 12:29 PM Peter Toth @.***> wrote:

@milastdbx https://github.com/milastdbx do you think you can take over this PR?

cc @cloud-fan https://github.com/cloud-fan, @mitkedb https://github.com/mitkedb

— Reply to this email directly, view it on GitHub https://github.com/apache/spark/pull/40744#issuecomment-1866092645, or unsubscribe https://github.com/notifications/unsubscribe-auth/BD3GPBCAU24GF5QGXTQCYLLYKQMRHAVCNFSM6AAAAAAW2SUTGCVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTQNRWGA4TENRUGU . You are receiving this because you were mentioned.Message ID: @.***>

milastdbx avatar Dec 25 '23 16:12 milastdbx

Can this PR be merged? I also encountered this scenario

waywtdcc avatar Mar 04 '24 02:03 waywtdcc

If want to achieve hierarch query, you could try following while this PR is not available atm.

https://pypi.org/project/pyspark-connectby/

firstim avatar Mar 06 '24 18:03 firstim

Any update on this PR?

SvenRelijveld1995 avatar Apr 29 '24 19:04 SvenRelijveld1995

@milastdbx are you still planning to take this up?

jeremyjh avatar May 09 '24 14:05 jeremyjh

@wangyum I see that you started the review last year and the issues you raised were addressed by Peter.

Then @milastdbx was tagged to take over the PR, but I don't see the issue being assigned to you yet.

How do we get this PR reviewed?

cc @cloud-fan, @mitkedb, @MaxGekk

jboarman avatar Jun 11 '24 19:06 jboarman

Thanks @peter-toth. I tested this patch locally. But it seem it throws StackOverflowError. How to reproduce:

./dev/make-distribution.sh --tgz  -Phive -Phive-thriftserver
tar -zxf spark-3.5.0-SNAPSHOT-bin-3.3.5.tgz
cd spark-3.5.0-SNAPSHOT-bin-3.3.5
bin/spark-sql
spark-sql (default)> WITH RECURSIVE t(n) AS (
                   >     VALUES (1)
                   > UNION ALL
                   >     SELECT n+1 FROM t WHERE n < 100
                   > )
                   > SELECT sum(n) FROM t;
23/05/30 13:21:21 ERROR Executor: Exception in task 0.0 in stage 265.0 (TID 199)
java.lang.StackOverflowError
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

Thanks for testing this PR @wangyum. Iterestingly, I didn't encounter stack overflow when recursion level is <100. The error starts to appear at level ~170 in my tests. I guess this depends on your default stack size. Since recursion works in a way that each iteration depends on the previous iteration, the RDD lineage of the tasks are getting bigger and bigger and the deserialization of those tasks can throw stack overflow error at some point. Let me amend this PR with adding optional checkpointing so as to truncate RDD linage and be able to deal with deeper recursion...

@peter-toth I have not looked closely at the implementation but I do have a question about this: has the logic been implemented in some way similar to tail call optimization such that there is no recursion depth limit?

travis-leith avatar Jul 03 '24 11:07 travis-leith

Any update ? Thanks !

sb-mirakl avatar Sep 19 '24 07:09 sb-mirakl

Let me close this PR as seemingly its open state causes some confusion. Feel free to use reuse the code if anyone wants to tacke this issue.

peter-toth avatar Sep 19 '24 08:09 peter-toth

@peter-toth can you also update the status on the Jira ticket? https://issues.apache.org/jira/browse/SPARK-24497

jeremyjh avatar Sep 24 '24 10:09 jeremyjh