dagster icon indicating copy to clipboard operation
dagster copied to clipboard

[DS][x/n] ScheduledSince condition instead of ScheduledSince delta

Open OwenKephart opened this issue 9 months ago • 1 comments

Summary & Motivation

I did a lot of internal back-and-forth on this, and what I landed on was that we very well may want to add a "ScheduledSinceDelta" at some point in the future, but that we should wait for concrete use cases before adding it (and at that point in time, we'll probably want a "ScheduledSinceCron" as well). At that point in time, we may think about how to best factor that implementation to make it more similar to the current one vs. the implementation I just removed (which was significantly more complicated).

As implemented, this condition (mostly) restores the current-world AMP eager behavior, which only "tries" a single time to fill in a missing partition or to respond to a parent update. This eliminates the need to add an additional check for failed runs (with a "cooldown"), as we get to set the "parent updated or missing" clause to False as soon as we request a run, rather than having to wait until it actually does become False (which almost certainly won't happen by next tick, and might not happen ever).

This also means we could remove the "in progress" check, which was added for similar reasons (i.e. once you request an asset because it was missing, it probably won't be filled in by the next tick , so you wait until the in progress run finished), but I think that is actually a reasonably useful check regardless (we've seen a fair number of complaints about the existing eager behavior kicking off multiple concurrent runs, so we can just filter those out by default, and if someone wants a different behavior they can remove that condition).

The eventual goal is to have this become the backbone of a sensor-analog replacement. i.e. you will be able to do something like the following:


class NewFileOnS3(SchedulingCondition):

    def evaluate(self, context: SchedulingContext) -> SchedulingResult:
        if self.new_file_since_previous_tick(context):
            true_slice = context.candidate_slice
         else:
            true_slice = context.create_empty_slice()
         return SchedulingResult.create(true_slice)

my_schedule_selection = AssetSelection.something()

# materialize if you haven't been requested since the the new file
# landed on s3
my_scheduley_condition = ~SchedulingCondition.requested_since(NewFileOnS3())

defs = Definitions(...).with_asset_specs_mapped(
    lambda spec: spec.with_attributes(
        scheduling=my_scheduly_condition | spec.scheduling
    ),
    asset_selection=my_schedule_selection,
)

TBD on exactly how much of that we abstract away. This also relies on the fact that we will be emitting Executions from within DS, as this means we can reliably kick off a run that targets all assets with this policy on the tick that NewFileOnS3 becomes true (so we don't have do to any complicated listening for parent updates at the partition boundaries)

How I Tested These Changes

OwenKephart avatar May 17 '24 21:05 OwenKephart