datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Introduce Async User Defined Functions

Open goldmedal opened this issue 9 months ago • 31 comments

Which issue does this PR close?

  • Closes #6518.

Rationale for this change

I have been working with @alamb to implement the functional for the async UDF.

  • https://github.com/goldmedal/datafusion-llm-function/pull/1

It introduces the following trait:

#[async_trait]
pub trait AsyncScalarUDFImpl: Debug + Send + Sync {
    /// the function cast as any
    fn as_any(&self) -> &dyn Any;

    /// The name of the function
    fn name(&self) -> &str;

    /// The signature of the function
    fn signature(&self) -> &Signature;

    /// The return type of the function
    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType>;

    /// The ideal batch size for this function.
    ///
    /// This is used to determine what size of data to be evaluated at once.
    /// If None, the whole batch will be evaluated at once.
    fn ideal_batch_size(&self) -> Option<usize> {
        None
    }

    /// Invoke the function asynchronously with the async arguments
    async fn invoke_async_with_args(
        &self,
        args: AsyncScalarFunctionArgs,
        option: &ConfigOptions,
    ) -> Result<ArrayRef>;
}

It allows the user to implement the UDF for invoking some external remote function in the query. Given an async udf async_equal, the plan would look like:

> explain select async_equal(a.id, 1) from animal a
+---------------+----------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                   |
+---------------+----------------------------------------------------------------------------------------+
| logical_plan  | Projection: async_equal(a.id, Int64(1))                                                |
|               |   SubqueryAlias: a                                                                     |
|               |     TableScan: animal projection=[id]                                                  |
| physical_plan | ProjectionExec: expr=[__async_fn_0@1 as async_equal(a.id,Int64(1))]                    |
|               |   AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=async_equal(id@0, 1))] |
|               |     CoalesceBatchesExec: target_batch_size=8192                                        |
|               |       DataSourceExec: partitions=1, partition_sizes=[1]                                |
|               |                                                                                        |
+---------------+----------------------------------------------------------------------------------------+

To reduce the number of invoking the async function, CoalesceAsyncExecInput rule is used for coalescing the input batch of AsyncFuncExec.

See the details usages in the example.

What changes are included in this PR?

Remaining Work

  • [x] Support for ProjectExec
  • [x] Support for FilterExec
  • [ ] Support for Join Expression

Maybe implement in the follow-up PR

  • [ ] Async aggregation function
  • [ ] Async window function
  • [ ] Async table function (?

Are these changes tested?

Are there any user-facing changes?

goldmedal avatar Feb 23 '25 13:02 goldmedal

😮 -- thanks @goldmedal -- I'll put this on my list of things to review

alamb avatar Feb 24 '25 15:02 alamb

@alamb Sorry for the late. This PR is ready for review now. I want to focus on Projection and Filter, which currently invoke the async UDF. After ensuring the approach makes sense, I'll create the follow-up PR for other plans.

goldmedal avatar Mar 12 '25 02:03 goldmedal

Thanks I'll put it on my list

alamb avatar Mar 12 '25 22:03 alamb

What's the status of this PR?

berkaysynnada avatar Apr 22 '25 12:04 berkaysynnada

What's the status of this PR?

It's ready to review. I'm still waiting for someone to help review it.

goldmedal avatar Apr 22 '25 15:04 goldmedal

What's the status of this PR?

It's ready to review. I'm still waiting for someone to help review it.

Thanks @goldmedal. We'll need this as well, so let's revive it. I'm putting this into my review list.

berkaysynnada avatar Apr 23 '25 14:04 berkaysynnada

What if we just added a new method to the PhysicalExpr trait, like evaluate_async()? We could then call this from streams that might involve async work. The default implementation would delegate to evaluate(), but in the case of ScalarFunctionExpr, we could branch depending on the function type.

How would that work going from sync -> async? For example: 1 = 2 OR 1 = call_llm_model_async(). I imagine this would build something like BinaryExpr(BinaryExpr(1, Eq, 2), Or, ScalarFunc(call_llm_model_async)). If we call evaluate_async on the outer BinaryExpr it would call evaluate() by default so now you're in sync world. How do you break back into async world? Do we pass around a handle to the tokio runtime?

adriangb avatar May 10 '25 21:05 adriangb

How would that work going from sync -> async? For example: 1 = 2 OR 1 = call_llm_model_async(). I imagine this would build something like BinaryExpr(BinaryExpr(1, Eq, 2), Or, ScalarFunc(call_llm_model_async)). If we call evaluate_async on the outer BinaryExpr it would call evaluate() by default so now you're in sync world. How do you break back into async world? Do we pass around a handle to the tokio runtime?

Easy answer is converting original evaluate()'s to async, and move all evalute() impls to evaluate_sync(), but I cannot fully estimate its effects and challenges. Any comes to your mind?

berkaysynnada avatar May 11 '25 10:05 berkaysynnada

How would that work going from sync -> async? For example: 1 = 2 OR 1 = call_llm_model_async(). I imagine this would build something like BinaryExpr(BinaryExpr(1, Eq, 2), Or, ScalarFunc(call_llm_model_async)). If we call evaluate_async on the outer BinaryExpr it would call evaluate() by default so now you're in sync world. How do you break back into async world? Do we pass around a handle to the tokio runtime?

Easy answer is converting original evaluate()'s to async, and move all evalute() impls to evaluate_sync(), but I cannot fully estimate its effects and challenges. Any comes to your mind?

I mean that makes sense but sounds like a lot of churn? I'm not sure tbh sync / async coloring is always a pain and I don't know of any good solutions :(

adriangb avatar May 11 '25 11:05 adriangb

How would that work going from sync -> async? For example: 1 = 2 OR 1 = call_llm_model_async(). I imagine this would build something like BinaryExpr(BinaryExpr(1, Eq, 2), Or, ScalarFunc(call_llm_model_async)). If we call evaluate_async on the outer BinaryExpr it would call evaluate() by default so now you're in sync world. How do you break back into async world? Do we pass around a handle to the tokio runtime?

Easy answer is converting original evaluate()'s to async, and move all evalute() impls to evaluate_sync(), but I cannot fully estimate its effects and challenges. Any comes to your mind?

I mean that makes sense but sounds like a lot of churn? I'm not sure tbh sync / async coloring is always a pain and I don't know of any good solutions :(

I'll try a POC when I find some time, and wonder @alamb's opinion

berkaysynnada avatar May 11 '25 14:05 berkaysynnada

How would that work going from sync -> async? For example: 1 = 2 OR 1 = call_llm_model_async(). I imagine this would build something like BinaryExpr(BinaryExpr(1, Eq, 2), Or, ScalarFunc(call_llm_model_async)). If we call evaluate_async on the outer BinaryExpr it would call evaluate() by default so now you're in sync world. How do you break back into async world? Do we pass around a handle to the tokio runtime?

Easy answer is converting original evaluate()'s to async, and move all evalute() impls to evaluate_sync(), but I cannot fully estimate its effects and challenges. Any comes to your mind?

I mean that makes sense but sounds like a lot of churn? I'm not sure tbh sync / async coloring is always a pain and I don't know of any good solutions :(

I'll try a POC when I find some time, and wonder @alamb's opinion

My feeling (without any solid data) is that using async functions is not ideal because:

  1. The async overhead (e.g. what it takes to make await vs a normal function) could be noticable, but maybe not that big a deal
  2. The fact that everything that calls UDF would have to be async (as only async functions can call other async functions) -- the so called "what color are your functions" problem -- we be quite disruptive.

Another benefit of the approach in this PR is that it requires no changes to any existing functions or APIs (in fact the original POC can be implemented entirely as a DataFusion user defined optimizer extension)

alamb avatar May 11 '25 19:05 alamb

  1. The async overhead (e.g. what it takes to make await vs a normal function) could be noticable, but maybe not that big a deal

If the awaited task doesn't include any I/O work or manual yielding, I believe we won't see a visible change, but of course it must be proven.

  1. The fact that everything that calls UDF would have to be async (as only async functions can call other async functions) -- the so called "what color are your functions" problem -- we be quite disruptive.

I think this is related to how we approach this feature. As you said, making evaluate() async introduces a conceptual change to datafusion. But if we aim to bring async evaluations to a unified point alongside sync evaluations and integrate them smoothly into DataFusion, some disruption is inevitable. That said, if we agree this is not a high-demand feature and this PR enables it while keeping things isolated, I'm fine with that too (though I'd still like to try and experiment with the idea 😇).

berkaysynnada avatar May 12 '25 07:05 berkaysynnada

Can we extend this sort of an approach to UDAFs? Having two entirely separate mechanisms would not be great.

ozankabak avatar May 12 '25 13:05 ozankabak

Can we extend this sort of an approach to UDAFs? Having two entirely separate mechanisms would not be great.

The data for an async user defined aggregate function must come from somewhere (presumably locally) and so aggregating using a remote service could be dicey

Maybe the usecaes is "send many rows of data to a remote LLM service" for example,

select user_id, call_llm_to_summarize(tweet_content)
FROM tweets
GROUP BY user_id

In this case, a lot of the user defined aggregate function API (like intermediate state, and GroupsAccumulators) doesn't make much sense.

alamb avatar May 12 '25 13:05 alamb

Maybe the usecaes is "send many rows of data to a remote LLM service" for example,

That's a use case, but there are others too. Maybe one runs a forecast model, which is a little too complicated to "embed" into the query engine. In that case, we may still want the engine to maintain the internal state and do the bookkeeping on groups etc., but offload the numerics computation elsewhere where it is implemented.

ozankabak avatar May 12 '25 14:05 ozankabak

Just a quick two cents that I quite like the approach in this PR. I've been keeping an eye on this for a use case not dissimilar to the "llm" use case, where we want to use an API that only exposes an async function and right now it seems like the mechanism to accomplish that involves something like a custom physical exec which is verbose.

A second problem is how to maintain CPU usage in the presence of an incredibly slow scalar function. I would personally optimize the case where a slow scalar function is spending most of its time waiting on IO vs the case where a slow scalar function is doing something that happens to involve a lot of CPU differently, and so I quite like that this PR keeps those two cases separate. Optimizing slow things is sort of an infinite rabbit hole depending on how slow your function is and how well you can estimate that at the various optimizer stages, but I like the approach here (an option to cut the batch size just for async things, if I'm reading it correctly).

paleolimbot avatar May 12 '25 16:05 paleolimbot

That's a use case, but there are others too. Maybe one runs a forecast model, which is a little too complicated to "embed" into the query engine. In that case, we may still want the engine to maintain the internal state and do the bookkeeping on groups etc., but offload the numerics computation elsewhere where it is implemented.

I tried to conceive scenarios for using async aggregate functions, but I believe the use cases depend heavily on user needs.

My initial thought is similar to @alamb's, intending to batch invoke an external function to process a batch. For aggregation, both computation and accumulation would run within the external function. So, it would be a single-stage aggregation, where a batch processed by AsyncFuncExec would result in an aggregated outcome (the intermediate process depends on the implementation of the external function).

I'm not sure if I misunderstood anything; if so, please feel free to correct me. Regarding the scenario you mentioned, which might require maintaining multi-stage aggregation (partial, final, final partitioned...), I think with the approach in this PR, we would need to provide a new physical plan (possibly called AsyncAggregateExec) to handle this scenario.

It might need to accept async function inputs and allow passing accumulators into the async function. I haven't thought of a very clear use case yet, but users should still be able to define the behavior of update_async and merge_async.

However, to some extent, this is like redoing the aggregation logic. Perhaps the solution mentioned by @berkaysynnada , adding evaluate_async to PhysicalExpr, would be a more fundamental approach, but this might involve changes to the entire physical expression evaluation. I don't have a strong opinion on this point yet.

goldmedal avatar May 13 '25 17:05 goldmedal

@goldmedal We discussed the aggregation scope with @ozankabak, and there are still a few open question-- like which parts of the aggregation process should actually be async, is it just the evaluation stage, or do we also need to make the update and merge stages async??

Introducing a new operator like AsyncAggregateExec might be a natural next step, as you mentioned. But to me, that direction feels more like a workaround than a scalable, long-term solution--it duplicates a lot of logic and risks fragmenting execution paths, and you're also aware of it IIUC.

I’m also curious how others envision using this feature. Is the goal mainly to support I/O-bound workloads, like the LLM use case? Or are there also plans to handle CPU-bound, compute-heavy tasks in a more async-friendly way? Depending on the use cases we want to support, it might be worth considering a more foundational approach. Of course, those come with significant design and implementation challenges, but it could open the door to a more unified and flexible execution model.

berkaysynnada avatar May 14 '25 11:05 berkaysynnada

I’m also curious how others envision using this feature.

The place I would use this in an aggregate context would need to call async functions in the accumulator stage (fetching things) but the merge stages are just slow. It's also sufficiently specialized that I might end up with a custom node anyway 🙂 . The type I'm working with is a sort of lazy tensor that we can slice or crop without resolving the values, but when we do resolve the values (the async part) it's often in aggregate.

paleolimbot avatar May 14 '25 13:05 paleolimbot

TLDR is while it is possible to implement the async UDFs using existing DataFusion APIs (no changes to the core), I think it is a sufficiently asked for, useful and desired feature (see comments on https://github.com/apache/datafusion/issues/6518) that it makes sense adding to the core (hence this PR)

@goldmedal We discussed the aggregation scope with @ozankabak, and there are still a few open question-- like which parts of the aggregation process should actually be async, is it just the evaluation stage, or do we also need to make the update and merge stages async??

I agree that it is not yet clear what the usecase is and thus if we try to design an API there is a real risk it will not match any particular usecase

Introducing a new operator like AsyncAggregateExec might be a natural next step, as you mentioned. But to me, that direction feels more like a workaround than a scalable, long-term solution--it duplicates a lot of logic and risks fragmenting execution paths, and you're also aware of it IIUC.

I agree adding a new operator at this time is premature for the reasons you mention (and others)

I’m also curious how others envision using this feature. Is the goal mainly to support I/O-bound workloads, like the LLM use case? Or are there also plans to handle CPU-bound, compute-heavy tasks in a more async-friendly way? Depending on the use cases we want to support, it might be worth considering a more foundational approach. Of course, those come with significant design and implementation challenges, but it could open the door to a more unified and flexible execution model.

In my mind I/O in functions is the usecase.

Flexibility in APIs is good, but I don't feel like we have a good sense of other problem we are trying to address with a more advanced API

I’m also curious how others envision using this feature.

The place I would use this in an aggregate context would need to call async functions in the accumulator stage (fetching things) but the merge stages are just slow. It's also sufficiently specialized that I might end up with a custom node anyway 🙂

In my mind, using a specialized ExecutionPlan node for async / heavy compute tasks -- the existing DataFusion APIs allow for this and you can do pretty much whatever you want.

alamb avatar May 14 '25 20:05 alamb

In my mind I/O in functions is the usecase.

Flexibility in APIs is good, but I don't feel like we have a good sense of other problem we are trying to address with a more advanced API

My use of async in udf's currently is to query either an external system or datafusion itself.

For example, I take the record batch provided to the udf, convert that to a dataframe, join that with another dataframe, do a bunch of ops on the result of that then return a column from the output of that as the output of the udf. thread::spawn(||..) + #[tokio::main] makes it possible.

Not exactly elegant but it works.

Omega359 avatar May 14 '25 21:05 Omega359

My use of async in udf's currently is to query either an external system or datafusion itself.

That is interesting, it almost sounds like you are using async udfs to implement some sort of custom subquery. Very interesting

alamb avatar May 15 '25 10:05 alamb

@goldmedal We discussed the aggregation scope with @ozankabak, and there are still a few open question-- like which parts of the aggregation process should actually be async, is it just the evaluation stage, or do we also need to make the update and merge stages async??

Introducing a new operator like AsyncAggregateExec might be a natural next step, as you mentioned. But to me, that direction feels more like a workaround than a scalable, long-term solution--it duplicates a lot of logic and risks fragmenting execution paths, and you're also aware of it IIUC.

👍

I’m also curious how others envision using this feature. Is the goal mainly to support I/O-bound workloads, like the LLM use case? Or are there also plans to handle CPU-bound, compute-heavy tasks in a more async-friendly way? Depending on the use cases we want to support, it might be worth considering a more foundational approach. Of course, those come with significant design and implementation challenges, but it could open the door to a more unified and flexible execution model.

I agreed with @alamb's point. I/O workload is my main goal. Besides the LLM case, I think invoking the data API is also the case. For the compute-heavy tasks, I have no furthermore design yet. However, I think it's good to have a more efficient design if we know the specific case.

goldmedal avatar May 15 '25 13:05 goldmedal

My use of async in udf's currently is to query either an external system or datafusion itself.

That is interesting, it almost sounds like you are using async udfs to implement some sort of custom subquery. Very interesting

Pretty much, yes.

Omega359 avatar May 15 '25 14:05 Omega359

Are there any remaining outstanding issues to merging this PR?

If not, perhaps we can merge it and file an epic / ticket for filling out the remaining features.

A blog post (perhaps based on the example here) would be 100% amazing

alamb avatar May 21 '25 15:05 alamb

Unless I hear anything else I plan to merge this tomorrow and will file a follow on Epic for other tasks (docs / blogs / support in other types of plans0

alamb avatar May 22 '25 19:05 alamb

🤖 ./gh_compare_branch_bench.sh Benchmark Script Running Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing epic/async-udf (2462dd0f9439c9c7d7104e8e168d2b64b597c473) to deeff88601772165615d04bbe5f0ea31ce1e8112 diff BENCH_NAME=sql_planner BENCH_COMMAND=cargo bench --bench sql_planner BENCH_FILTER= BENCH_BRANCH_NAME=epic_async-udf Results will be posted here when complete

alamb avatar Jun 03 '25 14:06 alamb

🤖: Benchmark completed

Details

group                                         epic_async-udf                         main
-----                                         --------------                         ----
logical_aggregate_with_join                   1.00    717.5±3.09µs        ? ?/sec    1.02    732.1±3.07µs        ? ?/sec
logical_select_all_from_1000                  1.03    127.2±0.25ms        ? ?/sec    1.00    123.9±0.26ms        ? ?/sec
logical_select_one_from_700                   1.00    407.5±1.80µs        ? ?/sec    1.03    417.8±1.37µs        ? ?/sec
logical_trivial_join_high_numbered_columns    1.00    370.9±2.30µs        ? ?/sec    1.03    380.3±1.07µs        ? ?/sec
logical_trivial_join_low_numbered_columns     1.00    355.4±1.17µs        ? ?/sec    1.03    367.3±2.64µs        ? ?/sec
physical_intersection                         1.00    820.8±5.33µs        ? ?/sec    1.02    838.3±6.67µs        ? ?/sec
physical_join_consider_sort                   1.00  1338.8±24.07µs        ? ?/sec    1.02  1367.8±21.22µs        ? ?/sec
physical_join_distinct                        1.00    344.0±1.65µs        ? ?/sec    1.04    356.9±1.65µs        ? ?/sec
physical_many_self_joins                      1.00     10.0±0.04ms        ? ?/sec    1.04     10.4±0.04ms        ? ?/sec
physical_plan_clickbench_all                  1.00    142.0±1.52ms        ? ?/sec    1.00    141.9±1.45ms        ? ?/sec
physical_plan_clickbench_q1                   1.01  1690.6±20.04µs        ? ?/sec    1.00  1676.9±17.84µs        ? ?/sec
physical_plan_clickbench_q10                  1.01      2.4±0.03ms        ? ?/sec    1.00      2.4±0.01ms        ? ?/sec
physical_plan_clickbench_q11                  1.02      2.5±0.04ms        ? ?/sec    1.00      2.5±0.01ms        ? ?/sec
physical_plan_clickbench_q12                  1.02      2.7±0.03ms        ? ?/sec    1.00      2.6±0.03ms        ? ?/sec
physical_plan_clickbench_q13                  1.01      2.3±0.04ms        ? ?/sec    1.00      2.3±0.02ms        ? ?/sec
physical_plan_clickbench_q14                  1.00      2.5±0.02ms        ? ?/sec    1.00      2.5±0.03ms        ? ?/sec
physical_plan_clickbench_q15                  1.01      2.4±0.02ms        ? ?/sec    1.00      2.4±0.02ms        ? ?/sec
physical_plan_clickbench_q16                  1.01      2.3±0.02ms        ? ?/sec    1.00      2.2±0.02ms        ? ?/sec
physical_plan_clickbench_q17                  1.02      2.4±0.03ms        ? ?/sec    1.00      2.3±0.02ms        ? ?/sec
physical_plan_clickbench_q18                  1.00  1957.6±18.79µs        ? ?/sec    1.00  1948.7±17.05µs        ? ?/sec
physical_plan_clickbench_q19                  1.01      2.8±0.05ms        ? ?/sec    1.00      2.8±0.03ms        ? ?/sec
physical_plan_clickbench_q2                   1.00  1921.1±36.84µs        ? ?/sec    1.05      2.0±0.15ms        ? ?/sec
physical_plan_clickbench_q20                  1.00  1669.5±11.52µs        ? ?/sec    1.00  1667.0±16.28µs        ? ?/sec
physical_plan_clickbench_q21                  1.00  1977.5±22.76µs        ? ?/sec    1.00  1970.5±19.98µs        ? ?/sec
physical_plan_clickbench_q22                  1.01      2.6±0.03ms        ? ?/sec    1.00      2.5±0.02ms        ? ?/sec
physical_plan_clickbench_q23                  1.00      2.8±0.02ms        ? ?/sec    1.00      2.8±0.03ms        ? ?/sec
physical_plan_clickbench_q24                  1.01      4.6±0.04ms        ? ?/sec    1.00      4.5±0.06ms        ? ?/sec
physical_plan_clickbench_q25                  1.01  1987.2±18.34µs        ? ?/sec    1.00  1963.1±10.11µs        ? ?/sec
physical_plan_clickbench_q26                  1.01  1814.5±16.10µs        ? ?/sec    1.00  1796.4±16.67µs        ? ?/sec
physical_plan_clickbench_q27                  1.01      2.0±0.03ms        ? ?/sec    1.00      2.0±0.02ms        ? ?/sec
physical_plan_clickbench_q28                  1.01      2.8±0.07ms        ? ?/sec    1.00      2.8±0.02ms        ? ?/sec
physical_plan_clickbench_q29                  1.01      3.5±0.04ms        ? ?/sec    1.00      3.4±0.03ms        ? ?/sec
physical_plan_clickbench_q3                   1.01  1912.1±23.07µs        ? ?/sec    1.00  1886.1±19.79µs        ? ?/sec
physical_plan_clickbench_q30                  1.00     14.2±0.10ms        ? ?/sec    1.01     14.3±0.16ms        ? ?/sec
physical_plan_clickbench_q31                  1.00      2.8±0.02ms        ? ?/sec    1.00      2.8±0.03ms        ? ?/sec
physical_plan_clickbench_q32                  1.01      2.8±0.03ms        ? ?/sec    1.00      2.8±0.04ms        ? ?/sec
physical_plan_clickbench_q33                  1.01      2.4±0.04ms        ? ?/sec    1.00      2.4±0.02ms        ? ?/sec
physical_plan_clickbench_q34                  1.01      2.1±0.04ms        ? ?/sec    1.00      2.1±0.03ms        ? ?/sec
physical_plan_clickbench_q35                  1.00      2.2±0.08ms        ? ?/sec    1.00      2.2±0.03ms        ? ?/sec
physical_plan_clickbench_q36                  1.01      2.9±0.04ms        ? ?/sec    1.00      2.9±0.03ms        ? ?/sec
physical_plan_clickbench_q37                  1.00      2.9±0.04ms        ? ?/sec    1.00      2.9±0.04ms        ? ?/sec
physical_plan_clickbench_q38                  1.00      2.9±0.03ms        ? ?/sec    1.00      2.9±0.03ms        ? ?/sec
physical_plan_clickbench_q39                  1.00      2.7±0.03ms        ? ?/sec    1.00      2.7±0.03ms        ? ?/sec
physical_plan_clickbench_q4                   1.01  1654.3±14.86µs        ? ?/sec    1.00  1642.6±18.13µs        ? ?/sec
physical_plan_clickbench_q40                  1.00      3.3±0.03ms        ? ?/sec    1.00      3.3±0.07ms        ? ?/sec
physical_plan_clickbench_q41                  1.00      2.9±0.03ms        ? ?/sec    1.00      2.9±0.04ms        ? ?/sec
physical_plan_clickbench_q42                  1.00      2.8±0.03ms        ? ?/sec    1.01      2.9±0.03ms        ? ?/sec
physical_plan_clickbench_q43                  1.00      3.0±0.05ms        ? ?/sec    1.00      3.0±0.06ms        ? ?/sec
physical_plan_clickbench_q44                  1.02  1819.9±26.74µs        ? ?/sec    1.00  1786.6±15.48µs        ? ?/sec
physical_plan_clickbench_q45                  1.01  1801.6±21.18µs        ? ?/sec    1.00  1791.7±10.66µs        ? ?/sec
physical_plan_clickbench_q46                  1.01      2.2±0.03ms        ? ?/sec    1.00      2.2±0.02ms        ? ?/sec
physical_plan_clickbench_q47                  1.01      2.8±0.02ms        ? ?/sec    1.00      2.7±0.02ms        ? ?/sec
physical_plan_clickbench_q48                  1.00      3.3±0.04ms        ? ?/sec    1.00      3.3±0.03ms        ? ?/sec
physical_plan_clickbench_q49                  1.00      3.7±0.02ms        ? ?/sec    1.00      3.8±0.04ms        ? ?/sec
physical_plan_clickbench_q5                   1.01  1856.8±23.43µs        ? ?/sec    1.00  1842.2±19.27µs        ? ?/sec
physical_plan_clickbench_q50                  1.00      3.3±0.03ms        ? ?/sec    1.00      3.3±0.03ms        ? ?/sec
physical_plan_clickbench_q51                  1.00      2.3±0.02ms        ? ?/sec    1.01      2.3±0.02ms        ? ?/sec
physical_plan_clickbench_q52                  1.01      3.1±0.11ms        ? ?/sec    1.00      3.1±0.03ms        ? ?/sec
physical_plan_clickbench_q6                   1.02  1878.7±28.84µs        ? ?/sec    1.00  1833.1±16.08µs        ? ?/sec
physical_plan_clickbench_q7                   1.00  1706.1±20.11µs        ? ?/sec    1.00  1699.8±17.47µs        ? ?/sec
physical_plan_clickbench_q8                   1.01      2.4±0.03ms        ? ?/sec    1.00      2.3±0.02ms        ? ?/sec
physical_plan_clickbench_q9                   1.01      2.3±0.03ms        ? ?/sec    1.00      2.2±0.01ms        ? ?/sec
physical_plan_tpcds_all                       1.00   1035.4±6.75ms        ? ?/sec    1.00   1034.4±4.32ms        ? ?/sec
physical_plan_tpch_all                        1.01     61.9±0.38ms        ? ?/sec    1.00     61.3±0.21ms        ? ?/sec
physical_plan_tpch_q1                         1.00      2.1±0.01ms        ? ?/sec    1.00      2.1±0.03ms        ? ?/sec
physical_plan_tpch_q10                        1.01      3.7±0.03ms        ? ?/sec    1.00      3.7±0.02ms        ? ?/sec
physical_plan_tpch_q11                        1.00      3.2±0.03ms        ? ?/sec    1.00      3.2±0.05ms        ? ?/sec
physical_plan_tpch_q12                        1.00   1805.8±9.86µs        ? ?/sec    1.00  1801.6±13.15µs        ? ?/sec
physical_plan_tpch_q13                        1.00  1413.4±38.47µs        ? ?/sec    1.00  1408.5±25.12µs        ? ?/sec
physical_plan_tpch_q14                        1.01  1898.2±21.46µs        ? ?/sec    1.00   1885.3±8.73µs        ? ?/sec
physical_plan_tpch_q16                        1.00      2.4±0.02ms        ? ?/sec    1.01      2.4±0.04ms        ? ?/sec
physical_plan_tpch_q17                        1.02      2.4±0.05ms        ? ?/sec    1.00      2.4±0.01ms        ? ?/sec
physical_plan_tpch_q18                        1.00      2.6±0.02ms        ? ?/sec    1.00      2.6±0.01ms        ? ?/sec
physical_plan_tpch_q19                        1.01      3.5±0.02ms        ? ?/sec    1.00      3.5±0.03ms        ? ?/sec
physical_plan_tpch_q2                         1.00      5.4±0.03ms        ? ?/sec    1.00      5.4±0.01ms        ? ?/sec
physical_plan_tpch_q20                        1.00      3.1±0.03ms        ? ?/sec    1.00      3.1±0.01ms        ? ?/sec
physical_plan_tpch_q21                        1.00      4.1±0.02ms        ? ?/sec    1.00      4.1±0.01ms        ? ?/sec
physical_plan_tpch_q22                        1.00      2.7±0.01ms        ? ?/sec    1.00      2.7±0.02ms        ? ?/sec
physical_plan_tpch_q3                         1.01      2.5±0.04ms        ? ?/sec    1.00      2.5±0.01ms        ? ?/sec
physical_plan_tpch_q4                         1.02  1568.2±11.99µs        ? ?/sec    1.00  1542.2±16.14µs        ? ?/sec
physical_plan_tpch_q5                         1.01      3.1±0.04ms        ? ?/sec    1.00      3.1±0.03ms        ? ?/sec
physical_plan_tpch_q6                         1.00    865.4±7.89µs        ? ?/sec    1.00   869.4±15.77µs        ? ?/sec
physical_plan_tpch_q7                         1.01      4.1±0.03ms        ? ?/sec    1.00      4.1±0.03ms        ? ?/sec
physical_plan_tpch_q8                         1.00      5.1±0.02ms        ? ?/sec    1.00      5.1±0.07ms        ? ?/sec
physical_plan_tpch_q9                         1.00      4.0±0.02ms        ? ?/sec    1.00      4.0±0.01ms        ? ?/sec
physical_select_aggregates_from_200           1.00     25.3±0.12ms        ? ?/sec    1.00     25.2±0.10ms        ? ?/sec
physical_select_all_from_1000                 1.03    141.2±0.28ms        ? ?/sec    1.00    137.5±0.26ms        ? ?/sec
physical_select_one_from_700                  1.00  1039.8±33.34µs        ? ?/sec    1.04   1081.8±6.21µs        ? ?/sec
physical_sorted_union_orderby                 1.00     60.9±0.42ms        ? ?/sec    1.00     60.9±0.57ms        ? ?/sec
physical_theta_join_consider_sort             1.00   1708.1±8.49µs        ? ?/sec    1.02   1738.5±6.62µs        ? ?/sec
physical_unnest_to_join                       1.00   1288.0±5.55µs        ? ?/sec    1.02   1314.6±8.37µs        ? ?/sec
with_param_values_many_columns                1.02    144.9±0.68µs        ? ?/sec    1.00    141.7±0.88µs        ? ?/sec

alamb avatar Jun 03 '25 15:06 alamb

This would be extremely useful for us. @alamb please would this get merged 🙏 .

samuelcolvin avatar Jun 18 '25 10:06 samuelcolvin

https://github.com/apache/datafusion/blob/630aa7b0c7b44ea8e77f9e0d685bf79f2a3cd3bd/datafusion/core/src/execution/context/mod.rs#L1766

Needs an option for async UDFs I guess.

samuelcolvin avatar Jun 18 '25 12:06 samuelcolvin