datafusion-ballista icon indicating copy to clipboard operation
datafusion-ballista copied to clipboard

Prune unneccessary data from task definition

Open thinkharderdev opened this issue 2 years ago • 1 comments

Is your feature request related to a problem or challenge? Please describe what you are trying to do. A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] (This section helps Arrow developers understand the context and why for this feature, in addition to the what)

When the scheduler sends a task to the executor it has to send the serialized ExecutionPlan. For very large plans (for isntance, scanning 10s of thousands of files) the plan can be very large and the cost to serialize/deserialize to protobuf is significant.

Describe the solution you'd like A clear and concise description of what you want to happen.

Since each task is only executing a single partition, we can prune all the FileScanConfig file_groups for other partitions. This can eliminate most of the bulk of the serialized plan.

Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.

  1. When preparing a task definition, prune the ExecutionPlan prior to serialization. This can be done pretty straightforwardly as a PhysicalOptimizerRule to handle the standard cases (ParquetExec, CsvExec, etc).
  2. For custom cases such as user-defined ExecutionPlan impls, add an argument to PhysicalExtensionCodec::try_encode:
pub trait PhysicalExtensionCodec: Debug + Send + Sync {
    fn try_encode(
        &self,
        node: Arc<dyn ExecutionPlan>,
        partitions: &[usize],
        buf: &mut Vec<u8>,
    ) -> Result<(), BallistaError>;

    ...

}

Additional context Add any other context or screenshots about the feature request here.

thinkharderdev avatar Jan 20 '23 16:01 thinkharderdev

I think there is prons and cons. The good part is that each partition/task will have a relatively small plan to deserialize, especially if the SQL include lots of UNION branches. The bad part is that, each partition/task will have different plan bytes. the Scheduler will have to serialize the different plan multiple times and can not be shared in the Stage level.

mingmwang avatar Feb 08 '23 07:02 mingmwang