Improve memory efficiency of scans
Currently, our ScanBuilder receives a concurrency setting, which determines the number of concurrent split_exec tasks that can be run per worker thread, where the number of worker threads is inferred from the number of available cores.
Concurrency is by default set to 4. My MacBook Pro has 12 cores, which means that 48 split_execs will execute in parallel.
It turns out that for the following query:
select count(*) from events where payload.ref = 'refs/heads/main'
The GHArchive dataset is 400k rows, and is roughly ~300MB of compressed Vortex.
However, there is enormous read amplification when trying to load it with DataFusion.
Each split of 8,192 rows becomes a ~3GB decompressed RecordBatch.
There are 50 total splits in the file, and so we end up forcing ~150GB of arrow data into memory, which is plenty to crash the process on most machines, including our CI runners.
A 500x read amplification is atrocious. We should figure out a way to prevent something like this from happening and potentially crashing production services.
For datafusion specifically, it has an optional memory back pressure system we can opt into, but what's the root cause here?
Each split is 3GB uncompressed and the mean on-disk split is 6MB? That's a wild compression ratio.
Hmm. Maybe by default we should use concurrency = 1 in DataFusion?
It seems like the core issue is that, for highly compressed datasets, the compute to memory ratio is too large. Sounds like the memory back pressure thing could be useful, but if each split is told it can use no more than, say, 3 GB, we need a way to realize that isn't enough memory to permit any within-core concurrency.