datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Add push down sort to the source (table provider)

Open karlovnv opened this issue 9 months ago • 4 comments

Is your feature request related to a problem or challenge?

Consider we have huge data source consists of many record batches. Now it's impossible to get last recent N rows without full scan:

SELECT * FROM Events
ORDER BE event_time DESC
LIMIT 1000

The query above will do full scan from the starting row. But we can avoid it if we let Provider know to perform scanning from the end to start.

It the example TableProvider may know that it needed to provide only last record batch (or the latest parquet file from folder).

Describe the solution you'd like

Now we have filter and limit in TableProvider::scan:

async fn scan(
        &self,
        state: &SessionState,
        projection: Option<&Vec<usize>>,
        // filters and limit can be used here to inject some push-down operations if needed
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {

Let's add SortExpression as well to push it down or just consider:

async fn scan(
        &self,
        state: &SessionState,
        projection: Option<&Vec<usize>>,
        // filters and limit can be used here to inject some push-down operations if needed
        filters: &[Expr],
        // sort expression
        expr: &[PhysicalSortExpr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {

Describe alternatives you've considered

Also we can add method with_sorting(self: Arc<Self>, ... ) to the trait ExecutionPlan and add an ability of pushing down sorting during optimization phase.

But I think that sorting is something fundamental so it's better to add it to TableProvider::scan

Additional context

No response

karlovnv avatar May 09 '24 09:05 karlovnv

Possibly related: https://github.com/apache/datafusion/issues/7871

alamb avatar May 09 '24 17:05 alamb

Possibly related: #7871

@alamb Thank you for the reply!

I've read discussion in #7871 and think that this case is different.

I don't want to say that MySourceExec can do sort better than DF does. I'd like to tell to MySourceExec how it should be load an external batches of data: from the beginning or from the tail (in reverse order).

How can I do that without knowing about sorting?

karlovnv avatar May 09 '24 17:05 karlovnv

It the example TableProvider may know that it needed to provide only last record batch (or the latest parquet file from folder).

The provider can tell DataFusion it produces data in some pre-existing order via https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.properties which can then communicate the sortedness via https://docs.rs/datafusion/latest/datafusion/physical_plan/struct.PlanProperties.html#method.output_ordering

However, that usecase is for data that is already always sorted. It doesn't permit DataFusion to say "I won't use the sort order you reported so you don't have to honor the contract" 🤔

alamb avatar May 09 '24 17:05 alamb

(BTW @NGA-TRAN and I worked on a very similar feature in InfluxDB IOx -- and we implemented a special operator that knows how to do this "read only the most recent file" for queries very much like your example above

SELECT * FROM Events
ORDER BE event_time DESC
LIMIT 1000

There is additional discussion / links on https://github.com/apache/datafusion/issues/10313

I think we would be interested in collaborating / pushing some of this logic upstream to datafusion. @matthewmturner and @suremarc may also be interested

alamb avatar May 09 '24 17:05 alamb