datafusion-ballista icon indicating copy to clipboard operation
datafusion-ballista copied to clipboard

Redefine the executor task slots

Open yahoNanJing opened this issue 3 years ago • 4 comments

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

For executor resources, like available task slots, available memory, previously for push-based task scheduling, the source of truth is managed at the scheduler side. For single scheduler, it's not a big problem by just managing them in memory. However, for multiple schedulers introduced by #59, the cost is too much to maintain the executor resources in the shared coordinator with locks, serializations, deserializations, especially when there are thousands of tasks to be scheduled.

Describe the solution you'd like

It's better to maintain the source of truth of executor resource at the executor side rather than the scheduler side. It's the executor who should know better its resources and who should decide which task to be scheduled for multiple schedulers case. The scheduler can still have the executor resource info as a hint for task scheduling. However, it's not a source of truth. For each scheduler, the task scheduler philosophy is to schedule tasks managed by itself as even as possible to the executors.

Solution:

  • In the executor, we will introduce queues for each scheduler and will schedule tasks received in a round robin way.
  • In the scheduler, we will only manage the executor resource info in memory rather than persist them in a shared storage.

Describe alternatives you've considered

Additional context

yahoNanJing avatar Aug 12 '22 08:08 yahoNanJing

How would you support all-at-once scheduling in this setup for streaming execution?

thinkharderdev avatar Aug 15 '22 11:08 thinkharderdev

How would you support all-at-once scheduling in this setup for streaming execution?

Do you mean long running tasks for streaming case? Would you submit hundreds of SQLs or jobs to the scheduler for the streaming case? If not, I think single active scheduler will satisfy your needs. Then you don't need active multiple schedulers, and for single active scheduler you don't need to save every update to the backend.

For task scheduling, firstly we should refine the scheduling policy for each executor by employing round-robin way as previous code did. Then for one scheduler, it will evenly schedule its responsible tasks to the executors. Here, we can regard every executor's task slots as a hint to achieve to even task scheduling. Then it will also be fair for multiple schedulers to assign tasks to the executors, for both interactive tasks and long running tasks. And it's the executor who knows better its resources and task running situations. And it's better for the executor to decide which task to run or to pend.

For #59, I think it will work well when there are few tasks and few tasks update. However, when there are thousands of tasks for a job, even a stage, the lock and serialization and deserialization cost would be huge. That's why we added memory cache for the scheduler state previously. However, the cache layer has been removed by #59.

yahoNanJing avatar Aug 16 '22 07:08 yahoNanJing

How would you support all-at-once scheduling in this setup for streaming execution?

Do you mean long running tasks for streaming case? Would you submit hundreds of SQLs or jobs to the scheduler for the streaming case? If not, I think single active scheduler will satisfy your needs. Then you don't need active multiple schedulers, and for single active scheduler you don't need to save every update to the backend.

For task scheduling, firstly we should refine the scheduling policy for each executor by employing round-robin way as previous code did. Then for one scheduler, it will evenly schedule its responsible tasks to the executors. Here, we can regard every executor's task slots as a hint to achieve to even task scheduling. Then it will also be fair for multiple schedulers to assign tasks to the executors, for both interactive tasks and long running tasks. And it's the executor who knows better its resources and task running situations. And it's better for the executor to decide which task to run or to pend.

For #59, I think it will work well when there are few tasks and few tasks update. However, when there are thousands of tasks for a job, even a stage, the lock and serialization and deserialization cost would be huge. That's why we added memory cache for the scheduler state previously. However, the cache layer has been removed by #59.

By streaming I just meant scheduling the entire DAG at once so batches are streamed between stages instead of using a disk-based shuffle and full materialization of each stage. Infinite streaming is not really something we need or something Ballista plans to support as far as I know.

For all-at-once scheduling I don't think round-robin scheduling would work. The individual tasks would need to synchronize execution. So if, for example, a task from stage 2 is scheduled on one executor it would block its executor slot until its input partitions start executing on their executors. Under certain conditions I'm fairly certain this could deadlock the system. To avoid that you would have to implement some complicated synchronization logic on the executor side.

I agree that the current implementation involves too much overhead but I think there is a better way to iterate on the current design without radically changing things (and I believe limiting future improvements). Broadly I think the way we were planning to address this is to break the scheduler into two separate workloads

  1. A single leader which is responsible for task scheduling on the executors and can leverage in-memory state to alleviate distributed locking and IO concerns. Executors would report task updates to the coordinator which would handle all scheduling and asynchronously replicate scheduling state to the persistent backend.
  2. N followers which would serve grpc requests (job status, execute query, etc) and handle physical planning, etc.

We would then of course use etcd for leader election in the distributed case (or have the single scheduler do both workloads in the standalone case). If the coordinator dies then we elect a new leader which would load the coordinator state from the persistent backend.

As an incremental step I think we could refine the locking. For simplicity sake, we use a global lock on the active jobs, but we could certainly refine that to use a lock only on a particular job (which is all that is really required for scheduling purposes).

thinkharderdev avatar Aug 16 '22 11:08 thinkharderdev

@thinkharderdev and I had a short sync up. We can firstly working on #130. Then flesh out the task slots management

yahoNanJing avatar Aug 18 '22 02:08 yahoNanJing