cubed
cubed copied to clipboard
Executor feature comparison
(I wrote this to help track what works needs to be done on the executors, but it might be useful to add to the user docs at some point.)
This table shows the features that the local (single-machine) executors support.
| Feature | single-threaded |
threads |
processes |
|---|---|---|---|
| Callbacks | :white_check_mark: | :white_check_mark: | :white_check_mark: |
| Task concurrency | :x: | :white_check_mark: | :white_check_mark: |
| Retries | :x: | :white_check_mark: | :x: |
| Timeouts | :x: | :x: | :x: |
| Straggler mitigation | :x: | :white_check_mark: | :white_check_mark: |
| Input batching | N/A 4 | :white_check_mark: | :white_check_mark: |
| Resume | :white_check_mark: | :white_check_mark: | :white_check_mark: |
| Compute arrays in parallel | :x: | :white_check_mark: | :white_check_mark: |
| Runtime memory check | :x: | :white_check_mark: | :white_check_mark: |
This table shows the same for the cloud executors:
| Feature | lithops |
modal |
beam (Dataflow) |
dask |
|---|---|---|---|---|
| Callbacks | :white_check_mark: | :white_check_mark: | :white_check_mark: | :white_check_mark: |
| Task concurrency | :white_check_mark: | :white_check_mark: | :white_check_mark: | :white_check_mark: |
| Retries | :white_check_mark: | :white_check_mark: | :white_check_mark: 1 | :white_check_mark: 2 |
| Timeouts | :white_check_mark: | :white_check_mark: | ? | :x: 3 |
| Straggler mitigation | :white_check_mark: | :white_check_mark: | ? | :white_check_mark: |
| Input batching | :x: | :white_check_mark: | N/A 5 | :white_check_mark: |
| Resume | :white_check_mark: | :white_check_mark: | :x: | :white_check_mark: |
| Compute arrays in parallel | :white_check_mark: | :white_check_mark: | :white_check_mark: | :white_check_mark: |
| Runtime memory check | :white_check_mark: | :white_check_mark: | :x: | :white_check_mark: |
| Supported clouds | AWS, GCP, and others | AWS, GCP | GCP | AWS, GCP |
Executors
The single-threaded executor is a very simple executor, used for running tests, and is deliberately designed not to have anything except the most basic features.
The threads executor is also for testing, but has a few features, mostly as a way to test async features locally, without having to use Modal, which is the only other async executor.
The processes executor is for running large datasets that can fit on a single machine's disk.
The other executors are all designed for real workloads running at scale, so all of the features are desirable. Some are included in the platform, while others are implemented in Cubed. For example, both Lithops and Modal provide timeouts as a part of the platform, whereas of the two only Modal provides retries as a built-in feature (for Lithops we implement retries in Cubed). Neither platform provides anything for mitigating stragglers, so Cubed provides a backup tasks implementation for both.
Features
Task concurrency - can the executor run multiple tasks at once?
Input batching - for very large computations it's important that not all inputs for a given array are materialized at once, as that might lead to an out of memory situation on the client. The remedy for this is to submit the input in batches, or in a streaming fashion if the platform supports it. See #239
Resume - can an executor resume a computation that didn't complete? (This requires that the computation is pickled so it can be restarted.)
Compute arrays in parallel - are arrays computed one at a time, or in parallel? For small arrays the latter can take advantage of more parallelism if it is available and speed up computations.
Runtime memory check - does the executor make sure that your allowed_mem setting is no greater than what the runtime provides? #220
Footnotes
- Google Cloud Dataflow has four retry attempts.
- Dask added retries in 2017. See also this SO thread. There is also a
Rescheduleexception that serves a similar purpose. - Dask doesn't seem to have task timeouts. There's a discussion about timeouts and very slow tasks here, including how to work around very slow or hanging tasks.
- One task is run at a time, which is not really batching.
- For Beam, the client submits a DAG to the service, so there is no problem with running out of memory on the client for very large arrays, thus there is no need to implement input batching.
This is very helpful.
For the Coiled Functions executor #260 I think everything is the same as the Dask column except that Callbacks have been implemented. Adding a runtime memory check should be straightforward too.
I've been looking at the Dask executor today, and I think using the distributed.Client.map API may make it a lot easier to implement the missing features in the table. (A very minor downside is that you can't use the Dask local scheduler, but we have the local Python executors for that.)
Here's a prototype AsyncDaskDistributedExecutor that does this. Since it uses asyncio, I was able to copy the Modal implementation for backups fairly easily. I think adding compute arrays in parallel, and input batching, would both be very similar to the Modal implementation too. The only missing feature would be timeouts, but I think with backups that's less important.
As far as I can tell Coiled Functions don't have an asyncio version - but perhaps the futures that it returns can be used in an asyncio context, in which case we'd be able to share a lot of code.
Nice! If we have an AsyncDaskDistributedExecutor is there any reason to keep the DaskDelayedExecutor?
Probably not.
As far as I can tell Coiled Functions don't have an asyncio version - but perhaps the futures that it returns can be used in an asyncio context, in which case we'd be able to share a lot of code.
Reading the docs, it says that .submit() will return a Dask Future - so we should be able to use everything from AsyncDaskDistributedExecutor.
#291 added batching for Python Async, Modal, and Dask.