dagster
dagster copied to clipboard
allow partitioned schedules to use custom partition selectors
Summary & Motivation
We have a use case (data warehouse with a weekly maintenance window) where we sometimes skip a partition, and then after the maintenance window schedule two partitions at the next tick instead of just one. We were able to accomplish this with a custom partition selector function. However, there's currently no way to pass a custom partition selector to the schedule build function. This was temporarily solved by simply copying that function into our code base, but we're probably not the only people who would benefit from a custom partition selector. I'm sure others can come up with some crazy use cases too. :)
This PR adds a new partition_selector
argument to build_schedule_from_partitioned_job
, with a default value of the existing latest_window_partition_selector
. If users make no changes, they'll see no behavior changes. They will now have the ability to specify custom selector functions for their more complex use cases.
How I Tested These Changes
We're running a custom partition selector in production right now. I haven't tested via this mechanism yet, but I see no reason why it wouldn't work. I'm happy to add whatever amount of testing you all would like. My assumption was that existing tests passing would count as proof that we haven't altered any behavior inadvertently, and that we're successfully passing the partition selector as a default argument.
@adam-bloom is attempting to deploy a commit to the Elementl Team on Vercel.
A member of the Team first needs to authorize it.
The latest updates on your projects. Learn more about Vercel for Git ↗︎
Hey @adam-bloom first of all, we very much appreciate you taking the time to put this together. Second of all, partition selectors are part of the legacy APIs that we recently removed, and we're not too keen on adding them back, because we think we can make it just as easy to accomplish the same thing without introducing that concept.
Here's a PR that refactors the implementation of build_schedule_from_partitioned_job
in a way that aims to make it easy to write your own custom version: https://github.com/dagster-io/dagster/pull/9318/files.
With it, you'd be able to write something like this if you want to implement custom partition selection:
@schedule(
cron_schedule=job.partitions_def.get_cron_schedule(
minute_of_hour, hour_of_day, day_of_week, day_of_month
),
job=job
execution_timezone=partitions_def.timezone,
)
def my_schedule(context):
partition_keys = partitions_def.get_partition_keys(context.scheduled_execution_time)
if len(partition_keys) == 0:
return SkipReason()
# modify this with arbitrary partition selection logic:
partition_key = partition_keys[-1]
yield job.run_request_for_partition(partition_key=partition_key, run_key=partition_key)
Thoughts?
Hey @sryza - that looks like a totally reasonable approach. This was on my list to look into with the 1.0 upgrade anyways (I was concerned it would break!) - hopefully later this week. I'll give what you proposed a try. If that works, I'll go ahead and close this (and I'm pretty optimistic it will).
The approach suggested by @sryza is working, so I'll go ahead and close this. Thanks for the quick feedback and suggestions - this is great functionality to have in our pipeline!
Glad it worked out for you @adam-bloom !