datafusion
datafusion copied to clipboard
Add push down sort to the source (table provider)
Is your feature request related to a problem or challenge?
Consider we have huge data source consists of many record batches. Now it's impossible to get last recent N rows without full scan:
SELECT * FROM Events
ORDER BE event_time DESC
LIMIT 1000
The query above will do full scan from the starting row. But we can avoid it if we let Provider know to perform scanning from the end to start.
It the example TableProvider may know that it needed to provide only last record batch (or the latest parquet file from folder).
Describe the solution you'd like
Now we have filter and limit in TableProvider::scan
:
async fn scan(
&self,
state: &SessionState,
projection: Option<&Vec<usize>>,
// filters and limit can be used here to inject some push-down operations if needed
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Let's add SortExpression as well to push it down or just consider:
async fn scan(
&self,
state: &SessionState,
projection: Option<&Vec<usize>>,
// filters and limit can be used here to inject some push-down operations if needed
filters: &[Expr],
// sort expression
expr: &[PhysicalSortExpr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Describe alternatives you've considered
Also we can add method with_sorting(self: Arc<Self>, ... )
to the trait ExecutionPlan
and add an ability of pushing down sorting during optimization phase.
But I think that sorting is something fundamental so it's better to add it to TableProvider::scan
Additional context
No response
Possibly related: https://github.com/apache/datafusion/issues/7871
Possibly related: #7871
@alamb Thank you for the reply!
I've read discussion in #7871 and think that this case is different.
I don't want to say that MySourceExec can do sort better than DF does. I'd like to tell to MySourceExec how it should be load an external batches of data: from the beginning or from the tail (in reverse order).
How can I do that without knowing about sorting?
It the example TableProvider may know that it needed to provide only last record batch (or the latest parquet file from folder).
The provider can tell DataFusion it produces data in some pre-existing order via https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.properties which can then communicate the sortedness via https://docs.rs/datafusion/latest/datafusion/physical_plan/struct.PlanProperties.html#method.output_ordering
However, that usecase is for data that is already always sorted. It doesn't permit DataFusion to say "I won't use the sort order you reported so you don't have to honor the contract" 🤔
(BTW @NGA-TRAN and I worked on a very similar feature in InfluxDB IOx -- and we implemented a special operator that knows how to do this "read only the most recent file" for queries very much like your example above
SELECT * FROM Events
ORDER BE event_time DESC
LIMIT 1000
There is additional discussion / links on https://github.com/apache/datafusion/issues/10313
I think we would be interested in collaborating / pushing some of this logic upstream to datafusion. @matthewmturner and @suremarc may also be interested