datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

[EPIC] Complete `datafusion-spark` Spark Compatible Functions

Open alamb opened this issue 7 months ago • 7 comments

Is your feature request related to a problem or challenge?

Many DataFusion users are using DataFusion to execution workloads originally developed for Apache Spark. Examples include

  • DataFusion Comet (@andygrove @comphead , etc)
  • LakeHQ / Sail (@shehabgamin )
  • Various internal pileines / engines (e.g. that @Omega359 and I think @Blizzara use)

They often do this for superior performance

  • Part of running Spark workloads is emulating Spark sematics
  • Emulating Spark semantics requires (among other things) functions compatible with Spark (which differs in semantics to the functions included in DataFusion)

Several projects are in the process of implementing Spark compatible function libraries using DataFusion's extension APIs. However. we concluded in https://github.com/apache/datafusion/issues/5600 that we could join forces and maintain a spark compatible funciton library in the core datafusion repo. @shehabgamin has implemented the first step in https://github.com/apache/datafusion/pull/15168 🙏

Describe the solution you'd like

This ticket tracks "completing" the spark function library started in https://github.com/apache/datafusion/pull/15168

Describe alternatives you've considered

Related Issues

  • [x] https://github.com/apache/datafusion/issues/5600
  • [x] https://github.com/apache/datafusion/pull/15168
  • [x] https://github.com/apache/datafusion/issues/15915
  • [ ] https://github.com/apache/datafusion-comet/issues/1704
  • [ ] https://github.com/apache/datafusion/issues/15916
  • [x] https://github.com/apache/datafusion/issues/16336
  • [ ] https://github.com/apache/datafusion/issues/11201
  • [ ] https://github.com/apache/datafusion/pull/14367
  • [ ] https://github.com/apache/datafusion-comet/issues/1819

Additional context

No response

alamb avatar May 01 '25 23:05 alamb

@shehabgamin @alamb I created an epic in Comet for implementing our current expressions as ScalarUDFImpl rather than PhysicalExpr as a first step towards contributing them in the datafusion-spark crate: https://github.com/apache/datafusion-comet/issues/1819

andygrove avatar May 30 '25 14:05 andygrove

@shehabgamin @alamb I created an epic in Comet for implementing our current expressions as ScalarUDFImpl rather than PhysicalExpr as a first step towards contributing them in the datafusion-spark crate: apache/datafusion-comet#1819

Nice! I’ll create an epic shortly after the DataFusion 48 release for Sail to contribute all of our implemented functions (and corresponding tests) to datafusion-spark.

Also, @linhr has some ideas around making sqllogictests easier to work with, for example so that test files no longer need to explicitly cast data types. I’ll let him share more on that.

shehabgamin avatar Jun 01 '25 09:06 shehabgamin

@linhr has some ideas around making sqllogictests easier to work with

Here is my idea to automate the test setup, without bringing Spark as a hard dependency.

  1. We create a Python script to generate SLT files.
    1. Define a set of input values for each data type, and optionally define custom input values for each function.
    2. Use PySpark to execute the function, get the output, and write the SLT file. Make sure the values are formatted in the way that the SLT engine can understand. Input data type casting can also be done here.
  2. For DataFusion developers who work on Spark functions, they run the Python script to update the SLT files whenever needed. PySpark is needed in a local virtualenv and we provide instructions for this setup.
  3. For DataFusion developers who do not work on Spark functions, they run the DataFusion tests in the existing way. They do not need to be aware of how the Spark functions SLT files are generated.
  4. We add a CI workflow that is triggered when Spark functions SLT files are changed, to make sure they are generated without unintended manual modification.

linhr avatar Jun 03 '25 13:06 linhr

Using pyspark to generate expected input/output that gets checked in. sounds like a great idea to me

BTW i hope to devote some time next week to helping organize this effort a bit more, but have been totally bogged down in working on filtering in parquet

alamb avatar Jun 05 '25 00:06 alamb

This sounds like a great idea. Thanks @linhr.

andygrove avatar Jun 05 '25 00:06 andygrove

I just had a chat with @shehabgamin

The current status is that we have not smoothed out the process to the point where contributors with minimal context can pick up a porting ticket and be successful porting the functions upstream. It seems writing / having tests tests are the last missing piece before we have a high likelihood of achieving this goal.

Here is the plan we came up with (please correct me if I am wrong):

  1. @shehabgamin will create a script that creates expected .slt files from the sail gold master expected output (what @linhr describes on https://github.com/apache/datafusion/issues/15914#issuecomment-2935366003).
  2. @shehabgamin will make a PR that checks in that pre-generated .slt files that are disabled somehow
  3. We will then make a PR that shows a function being ported over and enabling the relevant .slt file (maybe it can be @irenjj 's https://github.com/apache/datafusion/pull/15958)

Then @alamb will create (copy/paste style) tickets for the remaining functions (based on https://github.com/apache/datafusion/issues/15916)

alamb avatar Jun 11 '25 21:06 alamb

We add a CI workflow that is triggered when Spark functions SLT files are changed, to make sure they are generated without unintended manual modification.

I am not quite sure where @linhr imagines this CI workflow running -- maybe we can start with running it manually and then run it in CI if/when we find problems with accidental changes

alamb avatar Jun 11 '25 21:06 alamb

An update here is that we are waiting for one or two more good example PRs and then we'll turn the community on porting

If anyone wants to take a look / help out with https://github.com/apache/datafusion/issues/16612 / https://github.com/apache/datafusion/pull/16580 that would be super helpful

It might be something that codeing agents / claude code / etc can do easily 🤔

alamb avatar Jul 09 '25 12:07 alamb

An update here is that we are waiting for one or two more good example PRs and then we'll turn the community on porting

If anyone wants to take a look / help out with #16612 / #16580 that would be super helpful

It might be something that codeing agents / claude code / etc can do easily 🤔

Would it be easier to pick a couple of functions that Sail has already implemented?

next_day and last_day are simple functions.

There are tests in DataFusion ready to go as well!

  • https://github.com/apache/datafusion/blob/76ff87b132b62edac207bb1105836a52d32d269e/datafusion/sqllogictest/test_files/spark/datetime/next_day.slt
  • https://github.com/apache/datafusion/blob/76ff87b132b62edac207bb1105836a52d32d269e/datafusion/sqllogictest/test_files/spark/datetime/last_day.slt

shehabgamin avatar Jul 10 '25 07:07 shehabgamin

There are tests in DataFusion ready to go as well!

I filed two tickets to track this suggestion and marked them as good first issues

  • https://github.com/apache/datafusion/issues/16774
  • https://github.com/apache/datafusion/issues/16775

alamb avatar Jul 14 '25 19:07 alamb

Hi @alamb, I just wanted to clarify: if a Spark function appears in the sqllogictest tests, are we expected to implement it in DataFusion?

Standing-Man avatar Jul 25 '25 02:07 Standing-Man

Hi @alamb, I just wanted to clarify: if a Spark function appears in the sqllogictest tests, are we expected to implement it in DataFusion?

@Standing-Man that is the intention. I haven't created individual tickets for all of them as it seems like it is more complicated to port than we initially thought so I wanted to get the process honed more (get some great examples) before we started porting things en-mass

However, since you have now done several others, perhaps you could try to port a few more (I suggest the non-string ones) and hopefully once we see the code flowing easily without much back and forth on the PRs we can start filing many more

alamb avatar Jul 25 '25 19:07 alamb

Hi @alamb, I just wanted to clarify: if a Spark function appears in the sqllogictest tests, are we expected to implement it in DataFusion?

@Standing-Man that is the intention. I haven't created individual tickets for all of them as it seems like it is more complicated to port than we initially thought so I wanted to get the process honed more (get some great examples) before we started porting things en-mass

However, since you have now done several others, perhaps you could try to port a few more (I suggest the non-string ones) and hopefully once we see the code flowing easily without much back and forth on the PRs we can start filing many more

Got it, that makes sense. I’ll go ahead and try porting a few more non-string functions next. Hopefully this helps smooth out the process for scaling up later. Thanks for the guidance!

Standing-Man avatar Jul 25 '25 21:07 Standing-Man

I created an epic in Comet to track donating the remaining Spark expressions:

https://github.com/apache/datafusion-comet/issues/2084

andygrove avatar Aug 07 '25 15:08 andygrove

As an update from the Sail project, we recently attracted great community interest to expand the coverage of Spark functions. Many of our community members already contributed a ton of enhancements and bug fixes related to Spark functions!

One question I have, is how we should port Spark functions that are implemented using DataFusion logical expressions rather than ScalarUDFs. (There are many examples like this in the Sail repo.) In many such situations, there is a similar DataFusion function already and we only need proper type casting etc. to match the Spark semantic. I feel we need a good story how these can be ported.

One way I can think of is to define a placeholder UDF and leverage ScalarUDFImpl::simplify() so that the UDF actually got rewritten, so that the placeholder UDF will not be part of the physical plan. I'm not sure if this is a hack though, so I'd love to see if there is a better way.

cc @SparkApplicationMaster @davidlghellin @rafafrdz @anhvdq @jamesfricker who contributed to Sail so that you all are aware of this effort in DataFusion as well. I feel there is a chance for broader collaboration! cc @shehabgamin

linhr avatar Aug 11 '25 04:08 linhr

@linhr @andygrove I can rewrite the functions I worked on from expression api to datafusion-arrow api Will need some help with testing and review Also some code can be shared across functions so it would be nice to find a good place for it Maybe some of the functions can also be changed slightly on the datafusion-functions side to provide configurable functionality for different usecases and avoid code duplication (this, for example: https://github.com/apache/datafusion-comet/issues/2036)

SparkApplicationMaster avatar Aug 11 '25 08:08 SparkApplicationMaster

One way I can think of is to define a placeholder UDF and leverage ScalarUDFImpl::simplify() so that the UDF actually got rewritten, so that the placeholder UDF will not be part of the physical plan. I'm not sure if this is a hack though, so I'd love to see if there is a better way.

I think that makes sense to me (I don't know of a better way and it doesn't sound like a hack in my mind)

alamb avatar Aug 11 '25 21:08 alamb

Something that came up at the last DataFusion sync with @andygrove @Omega359 and others was the speed at which the spark functions were being added to DataFusion

I think one challenge is that we are holding them to the same review standards as the existing widely used functions, and given the spark support is at a different stage of its project lifecycle, we should probably be more focused on rounding out the feature support and ensuring there are sufficient tests. If/when these functions are adopted we can then improve their performance, edge cases, etc

With that in mind, I will try and start reviewing / merging spark functions faster

alamb avatar Oct 04 '25 10:10 alamb

For anyone else following along, something that would be super helpful would be to find existing PRs for porting spark functions into datafusion-spark and help ensure they are adequately tested. Even if we can improve the implementation as a follow on, ensuring the test coverage is sufficient initially is likely the single most important thing we can do to help this project along

alamb avatar Oct 15 '25 16:10 alamb

A question I have is how do we plan to support spark.sql.ansi.enabled config? I've seen a few PRs for Spark functions that try to cater for this config, but we don't exactly have a way to set this so it usually results in dead code (e.g. a hardcoded boolean that is set to false for now) or their own way of toggling it (e.g. another argument to the function).

Some related issues:

  • #3520
  • #17539

Jefffrey avatar Oct 22 '25 00:10 Jefffrey

A question I have is how do we plan to support spark.sql.ansi.enabled config? I've seen a few PRs for Spark functions that try to cater for this config, but we don't exactly have a way to set this so it usually results in dead code (e.g. a hardcoded boolean that is set to false for now) or their own way of toggling it (e.g. another argument to the function).

ConfigOptions via ScalarFunctionArgs. I plan at some point to submit a PR for a 'ansi' type config in DataFusion as my fork mainly exists for having that in to_date and to_timestamp.

Omega359 avatar Oct 22 '25 12:10 Omega359

A question I have is how do we plan to support spark.sql.ansi.enabled config? I've seen a few PRs for Spark functions that try to cater for this config, but we don't exactly have a way to set this so it usually results in dead code (e.g. a hardcoded boolean that is set to false for now) or their own way of toggling it (e.g. another argument to the function).

ConfigOptions via ScalarFunctionArgs. I plan at some point to submit a PR for a 'ansi' type config in DataFusion as my fork mainly exists for having that in to_date and to_timestamp.

Also potentially related is @Weijun-H 's PR for changing options

  • https://github.com/apache/datafusion/pull/18017

alamb avatar Oct 22 '25 14:10 alamb

A question I have is how do we plan to support spark.sql.ansi.enabled config? I've seen a few PRs for Spark functions that try to cater for this config, but we don't exactly have a way to set this so it usually results in dead code (e.g. a hardcoded boolean that is set to false for now) or their own way of toggling it (e.g. another argument to the function).

ConfigOptions via ScalarFunctionArgs. I plan at some point to submit a PR for a 'ansi' type config in DataFusion as my fork mainly exists for having that in to_date and to_timestamp.

@Omega359 I took a liberty to implement this in https://github.com/apache/datafusion/issues/18634 as Spark 4.0 ansi mode by default makes a shift to support this parameter in DF Spark functions

comphead avatar Nov 11 '25 23:11 comphead