unidist
unidist copied to clipboard
Dead lock in unidist when 2 mutually dependent tasks are submitted to same worker.
If we have 2 function calls A and B, such that B waits for completion of A as shown below
A = func1(args)
B = func2(args) # logic in B requires A to be completed and waits to make sure A is completed
In an ideal case If we both A and B are executed one after the other everything should work fine. But if A and B are submitted to the same worker(let us say x). A deadlock can occur if B is ready for execution before A is ready to execute. Deadlock description: As B executes, the logic in B waits for A's completion. As A is also submitted to same worker x . A can start only once B is done causing a deadlock.
An example of the case can be demonstrated with the below
import time
import asyncio
import unidist
unidist.init()
@unidist.remote
class SignalActor:
def __init__(self, event_count: int):
self.events = [asyncio.Event() for _ in range(event_count)]
def send(self, event_idx: int):
self.events[event_idx].set()
async def wait(self, event_idx: int):
await self.events[event_idx].wait()
signals = SignalActor.remote(3)
unidist.get(signals.send.remote(0))
@unidist.remote
def func(idx,x):
unidist.get(signals.wait.remote(idx))
unidist.get(signals.send.remote(idx + 1))
return idx
@unidist.remote
def func2():
time.sleep(1)
return 1
x=func2.remote() # 1st
A=func.remote(0,x)
y=func2.remote()
B=func.remote(1,0)
unidist.get(A)
unidist.get(B)
Execute with
UNIDIST_MPI_SPAWN=False mpiexec -n 6 python rep.py #worker
using 3 unidist cpus(-n 5) to make sure A and B are scheduled to same worker
This code would works= with UNIDIST_MPI_SPAWN=False mpiexec -n 6 python rep.py
as A and B would be scheduled to different CPUs. The same code would also pass with ray, as ray would be able to spawn additional workers when the workers are unavailable.
Proposed solution.
Unidist should be able to spawn additional workers as in the case of RAY if all the workers are busy for certain threshold in time and there are tasks pending to be executed. This would also require sending the pending tasks to execute on newly spawned workers by doing something similar to work stealing in Dask.