dagster
dagster copied to clipboard
`CheckSpec`s with `blocking` in `multi_asset`s stop the rest of the function from executing
Dagster version
1.6.11
What's the issue?
When defining a multi_asset with AssetCheckSpec(..., blocking=True)s, having one of the checks fail within the execution function will raise an error and stop all other assets in the multi_asset from executing.
What did you expect to happen?
Downstream assets from the failed check to not execute, without preventing other assets in the multi_asset and their downstream from executing.
How to reproduce?
@multi_asset(
outs={
"asset1": AssetOut(key=AssetKey(["asset1"])),
"asset2": AssetOut(key=AssetKey(["asset2"])),
},
check_specs=[
AssetCheckSpec(name="test1", asset=AssetKey("asset1"), blocking=True),
AssetCheckSpec(name="test2", asset=AssetKey("asset2"), blocking=True),
],
)
def my_func(context: AssetExecutionContext):
yield Output(value="foo", output_name="asset1")
yield AssetCheckResult(asset_key=AssetKey(["asset1"]), check_name="test1", passed=False)
# following won't execute
yield Output(value="foo", output_name="asset2")
yield AssetCheckResult(asset_key=AssetKey(["asset2"]), check_name="test2", passed=True)
Deployment type
None
Deployment details
No response
Additional information
No response
Message from the maintainers
Impacted by this issue? Give it a 👍! We factor engagement into prioritization.
This is a limitation with blocking checks that we'd like to resolve. The behavior works with a singular @asset_check but runs in to this with @multi_asset and @multi_asset_check.
I came across this discussion after opening the definition of the AssetCheckSpec class and reading this comment:
...
(
"blocking", # intentionally not public, see https://github.com/dagster-io/dagster/issues/20659
bool,
),
...
Unfortunately, initially I assumed that the blocking property is public because it's part of the AssetCheckSpec signature.
I thought we could use AssetChecks to conditionally execute downstream branches, but that seems to be not possible (for now). Is there any workaround for this problem?
Just encountered this by getting toposort.CircularDependencyError error when loading a code-location with asset depending on dbt_assets, all checked in multi_asset_check (Dagster 1.10.17):
toposort.CircularDependencyError: Circular dependencies exist among these items: XXX
File "...site-packages\dagster\_core\definitions\graph_definition.py", line 301, in _get_nodes_in_topological_order
order = toposort_flatten(backward_edges)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "...site-packages\dagster\_core\utils.py", line 76, in toposort_flatten
return [item for level in toposort(data) for item in level]
^^^^^^^^^^^^^^
File "...site-packages\dagster\_core\utils.py", line 72, in toposort
return [sorted(list(level), key=sort_key) for level in toposort_.toposort(data)]
^^^^^^^^^^^^^^^^^^^^^^^^
File "...site-packages\toposort.py", line 83, in toposort
raise CircularDependencyError(data)
A fix for this issue is scheduled to go out in 1.10.20 later this week.