prefect
prefect copied to clipboard
Prevent holding a task worker limit while waiting for task inputs to resolve
We've seen scenarios with task workers where a task worker will hang because all the capacity limiter slots are taken up with tasks that are waiting for upstream dependencies. The upstream dependencies can't run because there are no slots for execution. We should avoid holding a capacity limiter slot when a task waits for upstream dependencies to resolve these deadlocks.
To accomplish this, we can move dependency resolution to the task worker. The flow would look something like this:
- A task run comes in. Capacity limiter slot is taken.
- Parameters and
wait_for
for task run are read and inspected forPrefectDistributeFuture
s - If a
PrefectDistributedFuture
exists in the task run parameters, release the capacity limiter slot and wait for the futures to resolve. Once they resolve, update the parameters with the resolved value, setwait_for
to an empty list, and take a capacity limiter slot again. - Submit the updated parameters and
wait_for
to the task engine for execution. The engine will not wait since everything is already resolved.
Note the current work around for this issue is to increase the limit
on the task worker.