dagster icon indicating copy to clipboard operation
dagster copied to clipboard

Asset-level concurrency: skip materialization when another materialization is ongoing

Open dduong1603 opened this issue 2 years ago • 20 comments
trafficstars

What's the use case?

Dagster should skip the materialization of an asset if that asset is being materialized by another process:

  • from schedule
  • or auto-materialize policy
  • or manual run by another user

And maybe Dagster should also skip materialization if the asset is still considered fresh by the freshness policy. I have a few auto-materialize policies that run the same subset of upstream assets, or have a mixed of eager and lazy policies, and many assets are materialized multiple times even though the data version of themselves or their ancestors do not change.

This should help to save compute resource, or in the case of dbt, prevent failure of the run.

Ideas of implementation

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

dduong1603 avatar Jul 24 '23 15:07 dduong1603

Not sure if it will get you all the way to where you're trying to get, but https://docs.dagster.io/guides/limiting-concurrency-in-data-pipelines#configuring-opasset-level-concurrency might be helpful

sryza avatar Jul 25 '23 01:07 sryza

Hmm I'm not sure how concurrency limit would help in this case (even with a global limit). For example, let say I have 2 runs:

  • Run 1: materialization of asset A, B, and C
  • Run 2: materialization of asset A, C, and D

If these 2 runs are triggered at roughly the same time, asset A and C would be materialized twice. If I set up the pipeline well enough, I probably just waste some compute resource, but the data is still good. If I'm not careful, I may end up with some data duplication. And in the case of dbt, an entire run would fail depending on which one gets to run first.

The behavior I'm hoping for is that A and C will only run once (let's say from run 1). Then in run 2, if D depends on A and/or C, it will wait for A and/or C to be completed in run 1 before running.

Am I missing something from the concurrency doc?

dduong1603 avatar Jul 25 '23 03:07 dduong1603

Hey @dduong1603 I agree that this is a needed feature request. If this was available Dagster would provide extreme value over dbt. Once you hit a certain size the overlapping of individual dbt models (dagster assets) throughout dbt jobs is inefficient, costly, and in the worst case a cause of incorrect and hard to debug data errors.

paulBurnsUpside avatar Jul 28 '23 15:07 paulBurnsUpside

@dduong1603 I have also experienced issues with concurrent materialization of same dbt assets in parallel runs. In current implementation of dbt_asset asset selection is passed over to dbt and dbt creates own DAG. Once assets are selected and dbt run is issued, Dagster doesn't control asset materialization. Run 2 can be limited to asset D only, however it will create dependency on Run 1, Run 2 will have to wait for Run 1, or at least for AssetMaterialization event for A and C. Maybe instead of sorting out dependencies between Run 1 and Run 2, it is better to find a way to combine them in a single dbt run. AutoMaterialization policy supposed to do grouping to minimize computation. Thoughts?

nixent avatar Aug 04 '23 18:08 nixent

@dduong1603 I have also experienced issues with concurrent materialization of same dbt assets in parallel runs. In current implementation of dbt_asset asset selection is passed over to dbt and dbt creates own DAG. Once assets are selected and dbt run is issued, Dagster doesn't control asset materialization. Run 2 can be limited to asset D only, however it will create dependency on Run 1, Run 2 will have to wait for Run 1, or at least for AssetMaterialization event for A and C. Maybe instead of sorting out dependencies between Run 1 and Run 2, it is better to find a way to combine them in a single dbt run. AutoMaterialization policy supposed to do grouping to minimize computation. Thoughts?

My concern with this is that the 2 runs might not happen at the same time, or within a short time frame, i.e. asset A and C could be a long running asset, and the other run with asset D is some time much later. But if this is the case, I think it's even more important to skip the materialization for A and C 🤣

dduong1603 avatar Aug 04 '23 21:08 dduong1603

What's the process for getting this feature request prioritized?

paulBurnsUpside avatar Aug 07 '23 14:08 paulBurnsUpside

@paulBurnsUpside we're monitoring this request and including it in some of our discussions, but it's unlikely that we'll have the bandwidth to take it on within at least the next quarter.

sryza avatar Aug 09 '23 15:08 sryza

+1 this is useful for us. We have N upstream asset dependencies of an asset, each with their own sensor that can individually be triggered.

davidemerritt avatar Sep 18 '23 23:09 davidemerritt

I finally started converting some of my op/graphs into assets and really expected this feature to be present. I totally agree that an asset materialization should not start if it's already materializing. Or, at least give us the option to specify so. I suppose there might be some cases where assets could be parameterized and you might not mind if two versions materialize at the same time.

gnilrets avatar Sep 22 '23 00:09 gnilrets

I just ran into this when creating an auto-materializing pipeline that runs very frequently (every minute) because it is used to provide devs with near real time log data. The problem was that on the first run there was a lot of catching up to do so the first job would take a while. Meanwhile the triggering job kept running and thus kicking off new overlapping jobs. Eventually the concurrency limits kick in, but I still ended up with quite a bit duplicated data.

In this case it was easy to handle since I was monitoring it, but we would run into this problem whenever the pipeline stops for some reason and has to catch up so having the system enforce this would be great.

andnofence avatar Nov 03 '23 16:11 andnofence

@andnofence one idea that would help with this is an auto-materialize rule that avoids requesting the run if it's already queued. I filed an issue to track this: https://github.com/dagster-io/dagster/issues/17699.

sryza avatar Nov 03 '23 22:11 sryza

This is something we're facing as well - particularly as we have a union table which is being materialized whilst its upstream assets are still being updated (and hence fails)

+1 for the issue

AlanPedro avatar Jan 31 '24 13:01 AlanPedro

+1 this is an issue for us.

olivergate avatar Feb 05 '24 09:02 olivergate

+1

reidfalconer avatar Feb 05 '24 09:02 reidfalconer

+1

misa-nogly-aleksic avatar Feb 05 '24 11:02 misa-nogly-aleksic

+1

ion-elgreco avatar Feb 08 '24 23:02 ion-elgreco

+1

stufan avatar Mar 04 '24 04:03 stufan

+1 to push this feature

andrea-montes-yello avatar Mar 20 '24 20:03 andrea-montes-yello

+1

quickcoffee avatar Mar 21 '24 15:03 quickcoffee

+1 -- would be really helpful to have this feature

kristianeschenburg avatar May 21 '24 19:05 kristianeschenburg

+1

NitramXDX avatar Jun 04 '24 16:06 NitramXDX

+1

xxkennyxu avatar Jun 07 '24 21:06 xxkennyxu

+1

axellpadilla avatar Oct 29 '24 02:10 axellpadilla

@OwenKephart this is now all possible with automation conditions, right?

I guess it can be closed

ion-elgreco avatar Jan 16 '25 23:01 ion-elgreco

@ion-elgreco yes, thanks!

AutomationCondition.in_progress() handles this situation.

OwenKephart avatar Jan 17 '25 00:01 OwenKephart