dagster
dagster copied to clipboard
(RFC) New AssetGraphStep API for factories
New pattern for factories: the AssetGraphStep
Writing asset and asset check factories is important to a lot of Dagster users and to the Dagster core team.
Unfortunately it's not a great experience right now. There's a number of problems. A few of them:
- It relies on dynamically creating decorated functions, which is an awkward pattern. Higher order functions are not that normative in Python. The best we have done is the
dbt_assets
pattern that composes a decorator itself. - However even in the best case there are composability. If you want to change the function signature provided to the use, it is very challenging, as you lose introspection-based features (e.g. inferring required resource keys from parameters) that Dagster users expect.
- Decorated functions have unclear typing in Python. And our
multi_asset
is particularly challenging here. You cannot reliably type it, so typically it is left untyped, and it allows a huge set of return values, which is effectively undocumented.
This proposes a new class-based API for building executable definitions in Dagster, designed for the factory use case. Class-based APIs are more natural for pluggability for Python.
Let's just walk through the example of the example YAML DSL in examples/experimental/assets_yaml_dsl/assets_yaml_dsl/pure_assets_dsl/assets_dsl.py
The before code is:
def from_asset_entries(asset_entries: Dict[str, Any]) -> List[AssetsDefinition]:
assets_defs = []
group_name = asset_entries.get("group_name")
for asset_entry in asset_entries["assets"]:
asset_key_str = asset_entry["asset_key"]
dep_entries = asset_entry.get("deps", [])
description = asset_entry.get("description")
asset_key = AssetKey.from_user_string(asset_key_str)
deps = [AssetKey.from_user_string(dep_entry) for dep_entry in dep_entries]
sql = asset_entry["sql"] # this is required
@asset(key=asset_key, deps=deps, description=description, group_name=group_name)
def _assets_def(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
):
# instead of querying a dummy client, do your real data processing here
python_executable = shutil.which("python")
assert python_executable is not None
pipes_subprocess_client.run(
command=[python_executable, file_relative_path(__file__, "sql_script.py"), sql],
context=context,
).get_results()
assets_defs.append(_assets_def)
return assets_defs
def get_asset_dsl_example_defs() -> List[AssetsDefinition]:
asset_entries = load_yaml("assets.yaml")
return from_asset_entries(asset_entries)
We consume a dictionary loaded from a yaml file and create an AssetsDefinition
object by dynamically creating a function decorated with @asset
within the function and return it.
Here is the implementation using AssetGraphStep
:
class PureDSLAsset(AssetGraphStep):
def __init__(self, asset_entry: dict, sql: str, group_name: Optional[str]):
self.sql = sql
super().__init__(
specs=[
AssetSpec(
group_name=group_name,
key=AssetKey.from_user_string(asset_entry["asset_key"]),
description=asset_entry.get("description"),
deps=[
AssetKey.from_user_string(dep_entry)
for dep_entry in asset_entry.get("deps", [])
],
)
],
compute_kind="python",
)
def execute(
self, context: AssetGraphExecutionContext, pipes_subprocess_client: PipesSubprocessClient
)-> EntitySetExecuteResult:
python_executable = shutil.which("python")
assert python_executable is not None
return pipes_subprocess_client.run(
command=[python_executable, file_relative_path(__file__, "sql_script.py"), self.sql],
context=context,
).get_results()
def from_asset_entries(asset_entries: dict) -> List[AssetsDefinition]:
return [
PureDSLAsset(
asset_entry=asset_entry,
sql=asset_entry["sql"],
group_name=asset_entries.get("group_name"),
).to_assets_def() # we would probably eventually alter Definitions to take these eliminating the need for this directl call
for asset_entry in asset_entries["assets"]
]
def get_asset_dsl_example_defs() -> List[AssetsDefinition]:
asset_entries = load_yaml("assets.yaml")
return from_asset_entries(asset_entries)
- Direct invocation is obvious and requires no magic.
execute
is a directly invokable Python function - The function signature of
execute
is more obvious and overridable. There is one return type.-
Note: We could also eliminate resource parameter magic completely if we wanted here, and instead have an inner class called
Resources
that defines the required resources and their types.
-
Note: We could also eliminate resource parameter magic completely if we wanted here, and instead have an inner class called
- The class has a much simpler interface than
AssetsDefinition
or evenmulti_asset
.
Here is the base class __init__
that subclasses would call:
class AssetGraphStep(ABC):
def __init__(
self,
specs: Sequence[Union[AssetSpec, AssetCheckSpec]],
compute_kind: Optional[str] = None,
subsettable: bool = False,
tags: Optional[dict] = None,
friendly_name: Optional[str] = None,
partitions_def: Optional[PartitionsDefinition] = None,
# below are TODOs in PR
backfill_policy: Optional[BackfillPolicy] = None,
retry_policy: Optional[RetryPolicy] = None,
config_schema: Optional[UserConfigSchema] = None,
): ...
Compare to multi_asset
:
def multi_asset(
*,
outs: Optional[Mapping[str, AssetOut]] = None,
name: Optional[str] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
deps: Optional[Iterable[CoercibleToAssetDep]] = None,
description: Optional[str] = None,
config_schema: Optional[UserConfigSchema] = None,
required_resource_keys: Optional[Set[str]] = None,
compute_kind: Optional[str] = None,
internal_asset_deps: Optional[Mapping[str, Set[AssetKey]]] = None,
partitions_def: Optional[PartitionsDefinition] = None,
backfill_policy: Optional[BackfillPolicy] = None,
op_tags: Optional[Mapping[str, Any]] = None,
can_subset: bool = False,
resource_defs: Optional[Mapping[str, object]] = None,
group_name: Optional[str] = None,
retry_policy: Optional[RetryPolicy] = None,
code_version: Optional[str] = None,
specs: Optional[Sequence[AssetSpec]] = None,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
# deprecated
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]: ...
or AssetDefinition
def __init__(
self,
*,
keys_by_input_name: Mapping[str, AssetKey],
keys_by_output_name: Mapping[str, AssetKey],
node_def: NodeDefinition,
partitions_def: Optional[PartitionsDefinition] = None,
partition_mappings: Optional[Mapping[AssetKey, PartitionMapping]] = None,
asset_deps: Optional[Mapping[AssetKey, AbstractSet[AssetKey]]] = None,
selected_asset_keys: Optional[AbstractSet[AssetKey]] = None,
can_subset: bool = False,
resource_defs: Optional[Mapping[str, object]] = None,
group_names_by_key: Optional[Mapping[AssetKey, str]] = None,
metadata_by_key: Optional[Mapping[AssetKey, ArbitraryMetadataMapping]] = None,
tags_by_key: Optional[Mapping[AssetKey, Mapping[str, str]]] = None,
freshness_policies_by_key: Optional[Mapping[AssetKey, FreshnessPolicy]] = None,
auto_materialize_policies_by_key: Optional[Mapping[AssetKey, AutoMaterializePolicy]] = None,
backfill_policy: Optional[BackfillPolicy] = None,
descriptions_by_key: Optional[Mapping[AssetKey, str]] = None,
check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]] = None,
selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]] = None,
is_subset: bool = False,
owners_by_key: Optional[Mapping[AssetKey, Sequence[Union[str, AssetOwner]]]] = None,
# if adding new fields, make sure to handle them in the with_attributes, from_graph,
# from_op, and get_attributes_dict methods
): ...
Any class can be used as a resource, with or without ResourceParam
.
This would be framed to the user base as a more convenient API for building factories. However I do think the natural end state is to replace AssetsDefinition
as the core primitive.
In the next stack in this PR I will apply this pattern to a more realistic example which we just added, the freshness checks.
This stack of pull requests is managed by Graphite. Learn more about stacking.
Join @schrockn and the rest of your teammates on Graphite
I'm a big fan of the direction this is going. I need more time to pull all my thoughts together, but two questions come to mind.
First, this RFC is solving two problems:
- It's introducing the hypothesized "one-abstraction-to-rule-them-all" as a replacement for
AssetsDefinition
- It's solving our bad factory API situation.
I'm certainly in favor of solving both these problems, but I'm not sure that they should be solved with the same abstraction-- I don't have a firm opinion here yet, but I'm hesitant to expose a new core primitive in a public API. One of the advantages of moving to a new core primitive would be a decoupling of abstractions in our user definitions layer from internals, but IIUC this solution would persist the coupling.
Second, I think there's some conceptual murkiness around what should qualify as an "entity". Do we want the core graph that users think of to be an asset graph or a computation graph? Before asset observations and checks, these concepts were somewhat conflated because assets had at most one associated computation (the materialization function). We are moving to a world where an asset can have potentially many computations associated with it (various checks, whatever else we add).
Calling individual asset-associated computations "executable entities" feels kind of wrong to me, because it implies that the core entities of Dagster are computations rather than data assets. We already have the concept of an "op" as a generic "executable entity" of sorts, and I think we should lean in to that in whatever we do for a "generic asset-associated computation factory" API.
Like I said, I need to think some more on it, but that's my gut reaction.
I think you should frame this solely in the "It's solving our bad factory API situation." context and merely think of it as informing future internal restructuring.
I'd rather do this via composition rather than inheritance. This class is extremely clean and simple right now because of that structure. However I'm open to persuasion on this front if the upside is compelling.
Rebase stack on master
I agree that introducing the name "entity" is quite a bit and should be decoupled from the PR. However @sryza I did change the name to that based on your feedback to a previous name. I'm going to work on an alt name, probably around the word "subgraph".
Having fields on these classes introduces a new place where state can live. Versus - say - putting it inside the metadata for the asset. Do we have guidelines for how users should make this choice?
This is straightforward. If they want the metadata persisted and displayed in UI use metadata. If not, use state. This is actually a big problem in our current APIs, and we get around it by stashing vanilla python objects in metadata (like the translator with dbt_assets
) and it displays nonsense in the UI. This is much better, and good piece of evidence that this is a superior structure.
Introducing a concept that is parallel to AssetsDefinition but different in subtle ways. E.g. a future where a bunch of methods accept Union[AssetsDefinition, ExecutableEntitySet] feels a bit dystopian. An inheritance relationship between the two would make this feel a little less scary.
Composition felt nice (and more conservative, actually) but I can make an attempt with an inheritance relationship. A concern I have with AssetsDefinition
is its copying semantics, which are mindbending, and will greatly complicate an inheritance relationship.
I think we should be opening to heavily deprecating existing parameters on AssetsDefinition and introducing new ones.
AssetsDefinition
at this point is inaccurately named and has massive internal problems that I think are effectively unfixable because of backwards compatibility guarantees. With the introduction of AssetNode
, we can hollow out its core, but molding its constructor will be a large amount of effort.
I'm still high conviction that this is a much better path forward for factories. Given our heavy investment in that area I think this is timely. The abstractions we use the factory layer will bleed up through a higher level system like Nope or dagster-yaml.
However @sryza I did change the name to that based on your feedback to a previous name. I'm going to work on an alt name, probably around the word "subgraph".
Yeah just to clarify my position on the "entity", I think we'd ideally be able to find a name that's
- More general than "asset" (because it doesn't include checks)
- More specific than "entity" (because it includes everything)
In other conversations, I've used the word "entity" as a stand-in for "something more general than asset" because I couldn't think of something better.
This is straightforward. If they want the metadata persisted and displayed in UI use metadata. If not, use state.
👍 that distinction between UI and not UI makes sense to me.
Not at all a blocker here, but potentially worth considering a direction where these classes inherit from NamespacedMetadataSet
subclasses, to support typed metadata.
Composition felt nice (and more conservative, actually) but I can make an attempt with an inheritance relationship
Yeah definitely with you on inheritance being trickier and less conservative. I think maybe an important question to clarify here is whether this is:
- A helpful layer for defining factories
- A successor to
AssetsDefinition
If the former, then I think the current composition pattern makes a lot of sense. I wonder if we can choose a name that communicates this better. Like Base<Entity>Definer
or something.
If the latter, then I think it makes this change overall higher-stakes, brings up the larger question of how these two interact, and makes inheritance potentially more attractive for the aforementioned Union[AssetsDefinition, ExecutableEntitySet]
reason.
Btw, in the world of YAML/Nope, the abstraction I've been playing around with looks something like this:
class DefinitionSetConfig(DagsterModel, ABC):
@abstractmethod
def build_definitions(self) -> Definitions:
...
I updated this with a new set of names. I did not update the PR summary, so please ignore it at this point.
Also want to emphasize that as it stands this is an RFC and not intended for commit. This is to drive discussion.
Given the feedback of the direction of intermixing this with the AssetsDefinition
class hierarchy I have gone the direction of introducing a more fundamental abstraction in the system. I have not done the work of inheriting from or changing AssetsDefinition
as my goal here is to drive a naming discussion.
I am also in this PR not attempting to introduce a new term like "entity."
New proposed names:
I am proposing a new noun, an executable. An executable represents a set of definitions in the asset graph layer that are executed within the same node in the op layer. The class therefore is called AssetGraphExecutable
.
Alternatives I considered:
-
ExecutableSubgraph
. Not chosen because "subgraph" does not imply execution within the same op or graph. -
ExecutableAssetGraphFragment
. Fragment didn't seem quite right to me -
ExecutableDefinitionSet
. A bit of a mouthful -
ExecutableDefinitionGroup
. Overloads group.
Somes pros of AssetGraphExecutable
:
- If we refactor the framework internals around this we will make a lot of code like this:
def _ensure_resources_dont_conflict(
asset_graph: AssetGraph,
resource_defs: Mapping[str, ResourceDefinition],
) -> None:
"""Ensures that resources between assets, source assets, and provided resource dictionary do not conflict."""
resource_defs_from_assets = {}
for asset in asset_graph.assets_defs:
for resource_key, resource_def in asset.resource_defs.items():
if resource_key not in resource_defs_from_assets:
resource_defs_from_assets[resource_key] = resource_def
if resource_defs_from_assets[resource_key] != resource_def:
raise DagsterInvalidDefinitionError(
f"Conflicting versions of resource with key '{resource_key}' "
"were provided to different assets. When constructing a "
"job, all resource definitions provided to assets must "
"match by reference equality for a given key."
)
Turns into
def _ensure_resources_dont_conflict(
asset_graph: AssetGraph,
resource_defs: Mapping[str, ResourceDefinition],
) -> None:
"""Ensures that resources between assets, source assets, and provided resource dictionary do not conflict."""
resource_defs_from_executables = {}
for executable in asset_graph.executables:
for resource_key, resource_def in executable.resource_defs.items():
if resource_key not in executables:
resource_defs_from_executables[resource_key] = resource_def
if resource_defs_from_executables[resource_key] != resource_def:
raise DagsterInvalidDefinitionError(
f"Conflicting versions of resource with key '{resource_key}' "
"were provided to different executables. When constructing a "
"job, all resource definitions provided to executables must "
Which I think flows and is nice. I think we could also use the term Executable
in conversation and it would flow.
-
This gives us the opportunity to introduce
AssetGraphExecutionContext
, which can wrap up and consolidate a bunch of different context objects. It is the object that exists whenever you are doing any computation that is operating within the context of the asset graph. It defaults to thinking in terms of many assets, checks, and other future concepts. -
Similarly
AssetGraphExecutionResult = Iterable[Union[MaterializeResult, AssetCheckResult, ObserveResult]]
also seems pretty good to me.
Some cons:
- It's a new noun
- It is a more "core" change, rather than an additive layer to help factory creation. (This might be a pro? Just calling it out)
- "Executable" means other things in different contexts (i.e. operating systems).
To emphasize this is a proposal of naming if we went in the direction of an AssetsDefinition
successor which I think is premature. I would rather, in the short, flesh out this generalized pattern in the context of asset factory creation, and use that to inform a future iteration that would be a more profound change.
I am proposing a new noun, an executable. An executable represents a set of definitions in the asset graph layer that are executed within the same node in the op layer. The class therefore is called AssetGraphExecutable.
Could we use graph terminology more here? I think that might help us with naming.
It might be useful to explicitly think in "asset graph" and "op graph", rather than the "layer" terminology, which is a bit vague. What we are trying to represent is some directed mapping from the asset graph to the op graph.
Once we have the mapping, and we remove the dependencies within the asset graph and op graph, The mapping that we are representing is a bipartite graph.
An executable represents a set of definitions in the asset graph layer
"a set of definitions in the asset graph layer" is a subgraph of the asset graph.
that are executed within the same node in the op layer
The asset subgraph maps to an op subgraph. Constraining by "same node", feels weird, as that does not allow us to consider the beast of all beasts, the @graph_asset
and @graph_multi_asset
case. But here, I am assuming that "node" corresponds to a single "op".
If "node" is equivalent to a "subgraph of ops" (i.e. NodeInvocation
), then I guess "same node" makes sense. But feels weird to name that "node".
Yeah "node" here was a NodeDefinition
specifically.
Subgraph definitely another top contender. However we I went to write this code:
def _ensure_resources_dont_conflict(
asset_graph: AssetGraph,
resource_defs: Mapping[str, ResourceDefinition],
) -> None:
"""Ensures that resources between assets, source assets, and provided resource dictionary do not conflict."""
subgraphs = {}
for subgraphs in asset_graph.subgraphs:
for resource_key, resource_def in subgraphs.resource_defs.items():
if resource_key not in subgraphs:
subgraphs[resource_key] = resource_def
if subgraphs[resource_key] != resource_def:
raise DagsterInvalidDefinitionError(
f"Conflicting versions of resource with key '{resource_key}' "
"were provided to different subgrpahs. When constructing a "
"job, all resource definitions provided to subgraphs must "
"match by reference equality for a given key."
)
I balked at it and went for "execution"
However we I went to write this code:
I think we should strongly consider taking resource_defs
off of AssetsDefinition
. The reason we initially added this was that there was no way to attach resources to a repository / Definitions
object. However, these resource defs are not truly scoped to the AssetsDefinition
. They're required to be the same across all assets within the repository.
The resource_defs
parameter on @asset
and @multi_asset
is experimental for this reason. Not marking the resource_defs
parameter as experimental on AssetsDefinition
was an unfortunate accidental omission, but I think we can credibly say it has always been experimental.
They're required to be the same across all assets within the repository.
This is not accurate.
from typing import Any
from dagster import ConfigurableResource, asset
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.resource_annotation import ResourceParam
from dagster._core.execution.context.init import InitResourceContext
class ResourceA: ...
class ConfigurableResourceA(ConfigurableResource):
def create_resource(self, context: InitResourceContext) -> Any:
return ResourceA()
class ResourceB: ...
class ConfigurableResourceB(ConfigurableResource):
def create_resource(self, context: InitResourceContext) -> Any:
return ResourceB()
@asset
def asset_requires_resource_a(resource_a: ResourceParam[ResourceA]): ...
@asset
def asset_requires_resource_b(resource_b: ResourceParam[ResourceB]): ...
defs = Definitions(
resources={
"resource_a": ConfigurableResourceA(),
"resource_b": ConfigurableResourceB(),
},
assets=[
asset_requires_resource_a,
asset_requires_resource_b,
],
)
print(list(defs.get_assets_def("asset_requires_resource_a").resource_defs.keys()))
print(list(defs.get_assets_def("asset_requires_resource_b").resource_defs.keys()))
(dagster-internal.3.11.5-2024-04-26) ➜ dagster git:(master) ✗ python resources.py
['io_manager', 'resource_a']
['io_manager', 'resource_b']
This is not accurate.
What I mean is that different assets within a repository can't have different resource def values for the same resource key. The corollary of this is that providing resource_defs
at the AssetsDefinition
level doesn't give you any more degrees of freedom than providing them at the repository level.