datafusion-ballista
datafusion-ballista copied to clipboard
Prune unneccessary data from task definition
Is your feature request related to a problem or challenge? Please describe what you are trying to do. A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] (This section helps Arrow developers understand the context and why for this feature, in addition to the what)
When the scheduler sends a task to the executor it has to send the serialized ExecutionPlan
. For very large plans (for isntance, scanning 10s of thousands of files) the plan can be very large and the cost to serialize/deserialize to protobuf is significant.
Describe the solution you'd like A clear and concise description of what you want to happen.
Since each task is only executing a single partition, we can prune all the FileScanConfig
file_groups
for other partitions. This can eliminate most of the bulk of the serialized plan.
Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.
- When preparing a task definition, prune the
ExecutionPlan
prior to serialization. This can be done pretty straightforwardly as aPhysicalOptimizerRule
to handle the standard cases (ParquetExec
,CsvExec
, etc). - For custom cases such as user-defined
ExecutionPlan
impls, add an argument toPhysicalExtensionCodec::try_encode
:
pub trait PhysicalExtensionCodec: Debug + Send + Sync {
fn try_encode(
&self,
node: Arc<dyn ExecutionPlan>,
partitions: &[usize],
buf: &mut Vec<u8>,
) -> Result<(), BallistaError>;
...
}
Additional context Add any other context or screenshots about the feature request here.
I think there is prons and cons. The good part is that each partition/task will have a relatively small plan to deserialize, especially if the SQL include lots of UNION branches. The bad part is that, each partition/task will have different plan bytes. the Scheduler will have to serialize the different plan multiple times and can not be shared in the Stage level.