dagster
dagster copied to clipboard
Asset-level concurrency: skip materialization when another materialization is ongoing
What's the use case?
Dagster should skip the materialization of an asset if that asset is being materialized by another process:
- from schedule
- or auto-materialize policy
- or manual run by another user
And maybe Dagster should also skip materialization if the asset is still considered fresh by the freshness policy. I have a few auto-materialize policies that run the same subset of upstream assets, or have a mixed of eager and lazy policies, and many assets are materialized multiple times even though the data version of themselves or their ancestors do not change.
This should help to save compute resource, or in the case of dbt, prevent failure of the run.
Ideas of implementation
No response
Additional information
No response
Message from the maintainers
Impacted by this issue? Give it a 👍! We factor engagement into prioritization.
Not sure if it will get you all the way to where you're trying to get, but https://docs.dagster.io/guides/limiting-concurrency-in-data-pipelines#configuring-opasset-level-concurrency might be helpful
Hmm I'm not sure how concurrency limit would help in this case (even with a global limit). For example, let say I have 2 runs:
- Run 1: materialization of asset A, B, and C
- Run 2: materialization of asset A, C, and D
If these 2 runs are triggered at roughly the same time, asset A and C would be materialized twice. If I set up the pipeline well enough, I probably just waste some compute resource, but the data is still good. If I'm not careful, I may end up with some data duplication. And in the case of dbt, an entire run would fail depending on which one gets to run first.
The behavior I'm hoping for is that A and C will only run once (let's say from run 1). Then in run 2, if D depends on A and/or C, it will wait for A and/or C to be completed in run 1 before running.
Am I missing something from the concurrency doc?
Hey @dduong1603 I agree that this is a needed feature request. If this was available Dagster would provide extreme value over dbt. Once you hit a certain size the overlapping of individual dbt models (dagster assets) throughout dbt jobs is inefficient, costly, and in the worst case a cause of incorrect and hard to debug data errors.
@dduong1603 I have also experienced issues with concurrent materialization of same dbt assets in parallel runs. In current implementation of dbt_asset asset selection is passed over to dbt and dbt creates own DAG. Once assets are selected and dbt run is issued, Dagster doesn't control asset materialization. Run 2 can be limited to asset D only, however it will create dependency on Run 1, Run 2 will have to wait for Run 1, or at least for AssetMaterialization event for A and C. Maybe instead of sorting out dependencies between Run 1 and Run 2, it is better to find a way to combine them in a single dbt run. AutoMaterialization policy supposed to do grouping to minimize computation. Thoughts?
@dduong1603 I have also experienced issues with concurrent materialization of same dbt assets in parallel runs. In current implementation of dbt_asset asset selection is passed over to dbt and dbt creates own DAG. Once assets are selected and dbt run is issued, Dagster doesn't control asset materialization. Run 2 can be limited to asset D only, however it will create dependency on Run 1, Run 2 will have to wait for Run 1, or at least for AssetMaterialization event for A and C. Maybe instead of sorting out dependencies between Run 1 and Run 2, it is better to find a way to combine them in a single dbt run. AutoMaterialization policy supposed to do grouping to minimize computation. Thoughts?
My concern with this is that the 2 runs might not happen at the same time, or within a short time frame, i.e. asset A and C could be a long running asset, and the other run with asset D is some time much later. But if this is the case, I think it's even more important to skip the materialization for A and C 🤣
What's the process for getting this feature request prioritized?
@paulBurnsUpside we're monitoring this request and including it in some of our discussions, but it's unlikely that we'll have the bandwidth to take it on within at least the next quarter.
+1 this is useful for us. We have N upstream asset dependencies of an asset, each with their own sensor that can individually be triggered.
I finally started converting some of my op/graphs into assets and really expected this feature to be present. I totally agree that an asset materialization should not start if it's already materializing. Or, at least give us the option to specify so. I suppose there might be some cases where assets could be parameterized and you might not mind if two versions materialize at the same time.
I just ran into this when creating an auto-materializing pipeline that runs very frequently (every minute) because it is used to provide devs with near real time log data. The problem was that on the first run there was a lot of catching up to do so the first job would take a while. Meanwhile the triggering job kept running and thus kicking off new overlapping jobs. Eventually the concurrency limits kick in, but I still ended up with quite a bit duplicated data.
In this case it was easy to handle since I was monitoring it, but we would run into this problem whenever the pipeline stops for some reason and has to catch up so having the system enforce this would be great.
@andnofence one idea that would help with this is an auto-materialize rule that avoids requesting the run if it's already queued. I filed an issue to track this: https://github.com/dagster-io/dagster/issues/17699.
This is something we're facing as well - particularly as we have a union table which is being materialized whilst its upstream assets are still being updated (and hence fails)
+1 for the issue
+1 this is an issue for us.
+1
+1
+1
+1
+1 to push this feature
+1
+1 -- would be really helpful to have this feature
+1
+1
+1
@OwenKephart this is now all possible with automation conditions, right?
I guess it can be closed
@ion-elgreco yes, thanks!
AutomationCondition.in_progress() handles this situation.