[Question] Optimize multiple reads on same DataFrame
Hey,
I have a scenario where I have to run the same filter expression but with different values on the same RecordBatch
For example
let c2: Vec<RecordBatch> = ....
let provider = datafusion::datasource::MemTable::try_new(c2[0].schema(), vec![c2])
.map_err(|e| {
log::error!("Error MemTable {}", e);
e
})
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("t", provider ).unwrap();
let df = ctx.table("t").unwrap();
let expr: Expr = get_expression(id, from_time, to_time)
let df = df.filter(expr).unwrap();
let res = df.collect().await.unwrap();
ctx.deregister_table("t").unwrap();
It is pretty fast, a few ms on a 80MiB in-memory array with filtering on 2 columns. I might run 1000 queries on the same MemTable and was wondering if there is anything that could be optimized:
- pre computing an execution plan on the MemTable if it's cost effective
- Is SessionContext thread safe and shareable between multiple threads and be optimized across executions?
- Somehow create an index (not sure if an index is created by one of the calls or supported at all) if it's cost effective
Thanks!
What I ended up doing was to collect the record batches from the dataframe and because I have knowledge that the record batches are pre sorted by the id column from the read parquet file I could skip batches and apply the kernel filters by hand.
This cut the filtering time dramatically from 5ms average to 1ms. There are about 100 partitions. I wonder if a record batch could hold some statistics on the data, either pre computed or on demand and then Datafusion use that statistics in the physical plan optimization.