mars
mars copied to clipboard
[Proposal] a lineage reconstruction based failover for mars
Background
Large-scale distributed computing systems may fail due to various reasons, including network problems, machine failures, and process restarts. Network failures can cause nodes and workers to fail to send and receive data; machine failures and process restarts can cause data loss and tasks to be re-executed.
In Mars On Ray, failures fall into three main categories:
- The ray objects (input parameters, etc.) that subtask depends on exist, and the execution of subtask itself fails (usually caused by OOM). In this case, subtask can be retried;
- The ray objects (input parameters, etc.) that the subtask depends on are lost, but the lineage of those objects exist. In this case, the input parameters need to be recursively reconstructed through lineage reconstruction, and then the current subtask can be executed;
- The ray objects (input parameters, etc.) that subtask depends on are lost, but the lineages of the objects are lost. In this case, the subtask cannot be recovered;
Here we propose how to implement Mars On Ray failover based on the distributed future provided by Ray:
- Recover subtask based on task retry
- Recover lost subtask results through lineage reconstruction
- Cut off the lineage dependency chain through checkpoint and object management to avoid a large number of task reconstruction, and reduce the metadata storage overhead at the same time.
Proposal
Subtask retry
Most subtask execution failures can be directly recovered by task retry. Ray supports automatic rerunning of failed tasks. The default number of retries is 3. We can specify the number of retries through max_retries
when submitting subtasks. Set to -1 to retry indefinitely; set to 0 to disable retry.
The key to task retry is that each subtask must be idempotent and side-effect free, otherwise there will be data and state inconsistencies. Therefore, for each subtask, if it is retryable, set the configured number of retries when submitting the remote task corresponding to the subtask; if it is not retryable, set max_retries
to 0.
Recover subtask through lineage reconstruction
When the input objects that the subtask depends on are lost (node failure, object GC, etc.), the current subtask cannot be retried directly. All objects that the task depends on need to be recursively reconstructed before retrying the current subtask. Ray supports automatic reconstruction of the lost objects through lineage reconstruction, thereby recovering the failed subtask. When the task supports retry, the owner will cache the object lineage, that is, the task specification that needs to recreate the object. If all copies of the object are lost, the owner will resubmit the task that created the object, and the objects that the task depends on will be recursively reconstructed. The key to lineage reconstruction is that the owner holds the lineage of the object and its entire dependency tree. If the lineage is evicted, the object cannot be reconstructed and an error is raised:
ray.exceptions.ObjectReconstructionFailedLineageEvictedError: Failed to retrieve object 18b2ad3c688fb947ffffffffffffffffffffffff0100000001000000. To see information about where this ObjectRef was created in Python, set the environment variable RAY_record_ref_creation_sites=1 during `ray start` and `ray.init()`.
E
E The object cannot be reconstructed because its lineage has been evicted to reduce memory pressure. To prevent this error, set the environment variable RAY_max_lineage_bytes=<bytes> (default 1GB) during `ray start`.
../../../anaconda/envs/py3.7/lib/python3.7/site-packages/ray/worker.py:1811: ObjectReconstructionFailedLineageEvictedError
If the lineage is not evicted, it will start to resubmit the failed subtasks to recover the lost object:
[2022-04-25 23:32:37,446 E 381614 381695] core_worker.cc:510: :info_message: Attempting to recover 18 lost objects by resubmitting their tasks. To disable object reconstruction, set @ray.remote(max_tries=0).
Therefore, the key to support lineage reconstruction in Mars is to manage lineage and avoid lineage loss. Currently, the lineage occupied by each subtask graph when shuffle excluded is generally less than 100M, and the lineage occupied when shuffle included will rapidly increase from hundreds of M
to several G
with the number of chunks:
map chunk nums | subtasks nums | subtask graph serialization size&duration | ray mapper task spec size | ray reducer task spec size | rough lineage size |
---|---|---|---|---|---|
3000 | 4500 | 10s,22448580(22M) | 13015975,.i.e 12.5M | 434057232,.i.e 414M | 427M |
6000 | 9000 | 12s,45064250(43M) | 26032975, .i.e 25M | 1696331232, .i.e 1.6G | 1.6G |
9000 | 13500 | 18s,67681107(65M) | 39049975,.i.e 37.2M | 3816821232,.i.e 3.6G | 3.6G |
Since Mars is a fine-grained task graph and ray is a fine-grained lineage, the lineage overhead is much higher than a coarse-grained lineage such as spark. Even if Ray supports "collapsing" shared metadata in the future, e.g., keeping one metadata entry for all N outputs of a task, due to the fact that chunk graphs of most subtasks are different, the corresponding task specs are different too, so Mars cannot use this optimization to reduce lineage storage cost.
Therefore, in order to avoid the loss of lineage and reducing the lineage overhead of the supervisor, we should manage the lineage in a distributed manner, such as submitting some Subtask graphs to separate ray actors for execution.
To sum up, the Mars Failover based on lineage reconstruction can be designed as a distributed supervisor architecture. The distributed supervisor can also reduce pressure of the single supervisor:
Specifically:
-
SubtaskGraphSubmitter
is responsible for submitting all subtasks of the specified SubtaskGraph to the cluster for execution using ray task API. - We can also cut a SubtaskGraph into multiple subgraphs and send them to different SubtaskGraphSubmitter processes for submission to avoid scenarios where a single SubtaskGraph is too large;
-
SubtaskGraphSubmitter
needs to be created to a separate node through PG, avoid sharing the node with the computing process, and reduce the probability of the SubtaskGraphSubmitter Actor failure; -
LineageManager
is responsible for accounting the cost of lineage, and then determine when to submit SubtaskGraph in the supervisor or in the SubtaskGraphSubmitter actor. The main focus are: - SubtaskGraph serialization overhead, end-to-end delay.
- The resource overhead occupied by the Ray Actor itself can be set to num_cpus=0, because it only schedules tasks and lineage, and does not perform computation. Currently Ray will start evicting lineage when the memory occupied by lineage exceeds RAY_max_lineage_bytes (default 1GB).
- Also provides a switch to allow the remote SubtaskGraphSubmitter to be turned off
- SubtaskGraphSubmitter itself is also managed by the life cycle service as a reference, thus ensuring that SubtaskGraphSubmitter can be recycled when the corresponding object is no longer needed.
For simplicity, a set of heuristic rules can be implemented to determine when to submit SubtaskGraph locally or remotely:
- If lineage reconstruction is turned off, all Subtask Graphs are submitted in the supervisor
- If the lineage occupied by the current Subtask Graph is greater than the 1/2 threshold, submit it to the remote actor for scheduling
- If the current lineage has exceeded the 80% threshold, all subsequent SubtaskGraph are submitted to the remote actor for scheduling.
At the same time, Mars also needs to optimize the subtask to reduce the size of the task specification that needs to be stored, so that it can store the lineages of larger computing tasks:
- Prune subtask's chunk graph and input parameters
- Optimize serialized result size
Cut the dependency chain through checkpoint
Lineage reconstruction has its shortcomings, it has memory overhead and too much reconstruction issues:
- A large number of fine-grained lineages take up a lot of memory. If the dependency chain is very long, the lineages will OOM or be evicted.
- When a node fails and an object is lost, it will reconstruct from very early lineage, which causes a large number of subtasks to rerun and slow task execution. For most narrowly dependent subtasks, lineage reconstruction generally only needs to reconstruct a few Subtasks. However, if the upstream depends on ** shuffle subtask,** since shuffle is ** ALL-to-ALL communication**, it needs to rerun a large number of upstream subtasks or incur too much data transfer.
Therefore, Mars needs to implement the checkpoint mechanism to cut off the dependency chain to remove lineage metadata overhead and lots of subtask reconstruction.
In traditional computing engines, checkpointing is accomplished by storing the computing results in the distributed file system. The reliability provided by the distributed file system ensures that reconstruction can be finished by the end of the checkpointed stage.
In Mars on Ray, we can extend Ray's Object Store to achieve this capability. When submitting a remote task, we can specify the reliability of object that ray must ensure. When there is no node which has the checkpointed object, Ray should automatically load object data from external systems into object store. This ensures that even all objects replicas in object store has been lost, the subtask doesn't need to be reconstructed. In this way, if we find the appropriate cutpoint to cut off the lineage in mars, then Mars can release the lineage before this part of the object. Since it's ensured lineage reconstruction will succeed by the end of this cutpoint. The cutpoint rule can be like following:
- Checkpoint when submitting a wide-dependency task like shuffle subtask. Specify multiple object replicas to ensure that lineage reconstruction can be completed by the end of those subtasks and avoid a lot of overhead of wide-dependency recalculation.
- Extend Mars tileable's execute API to allow users to specify the number of replicas for result chunks. When the number of copies is greater than 1, all lineages of the tileable can be cleaned up, and the lineage reconstruction can be stopped there. This is the key to failover of iterative computations.
Limitation
- Lineage reconstruction does not support reconstruction of objects created by
ray.put
, but all objects in mars on ray future are the result of remote task execution, so this problem does not exist. - Lineage reconstruction will result in higher memory usage because the supervisor/SubtaskGraphSubmitter needs to store all subtasks that can be re-executed on failure.
- At present, Ray has only basic cache for lineage, which makes the lineage easy to be evicted by the large-scale and complex computation of frameworks built on ray, and the subsequent Failover fails. In the future, we need to expand Ray's lineage management capabilities to allow the upper-level framework to control the lineage life cycle in a more fine-grained manner.
- Currently if the lineage for object exists, the objects won't be gc, which may eventually use up disk space. We may need allow to decouple the lineage life cycle from object.
Devlopment Plan
- [x] #3029
- [ ] Implement SubtaskGraphSubmitter, encapsulate SubtaskGraph submission logic
- [ ] Implement LineageManager to support the submission of lineages in local and remote SubtaskGraphSubmitter based on rules
- [ ] Make SubtaskGraphSubmitter managebale by tileable lifecycle
- [ ] Extend the HA capability of the Ray object store, support specifying the number of object replicas and the object reliability level when submitting a remote task
- [ ] Implement the Mars checkpoint mechanism
- [ ] Subtask submission information pruning
- [ ] Subtask submission information serialization size optimization
- [ ] Decouple the lineage life cycle from ray object.
Reference
- https://docs.ray.io/en/latest/ray-core/troubleshooting.html#understanding-objectlosterrors
- https://docs.ray.io/en/master/ray-core/actors/fault-tolerance.html
- https://docs.ray.io/en/master/ray-core/actors/patterns/fault-tolerance-actor-checkpointing.html
- https://docs.ray.io/en/master/ray-core/tasks/fault-tolerance.html
- Ownership: A Distributed Futures System for Fine-Grained Tasks