dagster icon indicating copy to clipboard operation
dagster copied to clipboard

add optional op concurrency awareness into the queued run coordinator

Open prha opened this issue 1 year ago • 4 comments

Summary & Motivation

We want to prevent situations where concurrency-blocked runs are spinning, spamming the event log for progress. This changes makes it so that a narrow set of runs (where all the root nodes are concurrency blocked) do not get dequeued.

The op concurrency counter keeps track of:

  1. all the slots and pending steps for all concurrency keys matching the concurrency keys required by all queued runs
  2. all in-progress runs with STARTING run status and the concurrency keys for all their root nodes
  3. all the root node concurrency keys for the currently queued runs about to be launched

Philosophically, any queued run that is able to make progress on any step will get dequeued (absent any other constraints).

Additionally, the run coordinator can take an optional config for an offset, so that more runs (or fewer) can get dequeued than are available slots.

Similarly, a timeout in seconds can get set to treat runs in the STARTED state as having claims on all of its root nodes concurrency slots, to avoid aggressive dequeuing in between the run worker starting and the step attempting to claim a slot. The window is generally only a second, so this is probably not a big issue to fine-tune.

One big notable change is that all created runs will get tagged with the concurrency keys of its root nodes at run creation time. This requires the job_snapshot that is created with the run to reflect any asset selection / subselection. I have found that this is not always accurately done in tests, but I think all the execution paths do this correctly.

How I Tested These Changes

BK

prha avatar Jan 11 '24 23:01 prha

@gibsondan made the excellent observation that as soon as a slot opens up, all of the runs blocked by that key will suddenly dequeue. We'll probably have to do something to track all the in-progress runs and the slots that they might occupy. Depending on implementation, this might mean that we have underutilized slots?

prha avatar Jan 12 '24 00:01 prha

it doesn't really matter whether we do the bulk of the calculation at enqueue or dequeue time... the granularity is the annoying thing here.

The solution I've been playing around with is to have every run marked with the concurrency keys that it will potentially claim during the run. And then have the run coordinator page through all the in-progress runs and try to do the calculation of which slots have yet to be claimed, based on the status of the individual steps.

That's reconstructing all the executor logic, but in a centralized run coordinator, for every in-progress run. This is not something I'm keen on doing.

prha avatar Jan 17 '24 22:01 prha

Further going down that line of thinking, I could imagine a v2 of this feature where:

  • Runs are allowed to consist of multiple workers, each doing a subset of steps, instead of assuming a single run worker for the entire length of the run (obviously a very big change)
  • the only place global concurrency limits are ever checked or cared about is in this centralized place

gibsondan avatar Jan 17 '24 22:01 gibsondan

Okay, I updated this with the following changes: We tag each run upon submission with a map of step key => concurrency key and a list of all the root concurrency keys. The entire map is required to see if in-progress runs would take up a slot or not. The root concurrency keys are to figure out which keys to check for queued runs.

Because we do not break down in-progress runs into completed vs pending steps, we treat each run as somewhat monothilic. This means that we might have less than full utilization of all the slots, depending on the shape of the graph. This is in effect hoisting all op tags into run tags and using the instance-configured limits as tag concurrency limits.

Doing finer-grain accounting would require either:

  1. fetching step status for all in-progress runs, which can be quite expensive
  2. retaining some record of completed-steps for in-progress runs in the concurrency pending steps table, just for accounting purposes. These would have need to get cleaned up with the free-slots daemon used for canceled / failed runs that handle orphaned slots.

prha avatar Jan 18 '24 22:01 prha

Should state that the biggest risk here is overwhelming the DB with queries to fetch concurrency information on every iteration of the run coordinator (every 1 second). Any timeouts here would prevent runs from getting dequeued.

prha avatar Feb 22 '24 17:02 prha

@gibsondan also, I changed the entry on the dagster run to be a new named tuple called root_op_concurrency... the tuple allows us to a layer of abstraction to change the fields we need to extract to do this run-level accounting

prha avatar Feb 22 '24 18:02 prha