datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

[Question] Optimize multiple reads on same DataFrame

Open ParadoxShmaradox opened this issue 3 years ago • 1 comments

Hey,

I have a scenario where I have to run the same filter expression but with different values on the same RecordBatch

For example

let c2: Vec<RecordBatch> = ....
let provider = datafusion::datasource::MemTable::try_new(c2[0].schema(), vec![c2])
    .map_err(|e| {
        log::error!("Error MemTable {}", e);
        e
    })
    .unwrap();

let ctx = SessionContext::new();

ctx.register_table("t", provider ).unwrap();
let df = ctx.table("t").unwrap();

let expr: Expr = get_expression(id, from_time, to_time)

let df = df.filter(expr).unwrap();

let res = df.collect().await.unwrap();
ctx.deregister_table("t").unwrap();

It is pretty fast, a few ms on a 80MiB in-memory array with filtering on 2 columns. I might run 1000 queries on the same MemTable and was wondering if there is anything that could be optimized:

  • pre computing an execution plan on the MemTable if it's cost effective
  • Is SessionContext thread safe and shareable between multiple threads and be optimized across executions?
  • Somehow create an index (not sure if an index is created by one of the calls or supported at all) if it's cost effective

Thanks!

ParadoxShmaradox avatar Jul 06 '22 18:07 ParadoxShmaradox

What I ended up doing was to collect the record batches from the dataframe and because I have knowledge that the record batches are pre sorted by the id column from the read parquet file I could skip batches and apply the kernel filters by hand.

This cut the filtering time dramatically from 5ms average to 1ms. There are about 100 partitions. I wonder if a record batch could hold some statistics on the data, either pre computed or on demand and then Datafusion use that statistics in the physical plan optimization.

ParadoxShmaradox avatar Jul 11 '22 13:07 ParadoxShmaradox