astronomer-cosmos
astronomer-cosmos copied to clipboard
[feature] Support operator_arguments injection at a node level
Context
Currently, we can use operator_args to pass in task-level arguments to all tasks in a DbtTaskGroup
or DbtDAG
.
However, this applies across all tasks in the DAG/TaskGroup. Sometimes, we'd like granular control over what is passed into each model's operator.
Use case(s)
- A few models that require a longer
retry_delay
than other models in the same DAG. - A few models that require more
retries
than average models. - Setting
trigger_rule
toONE_SUCCESS
for a particular model.
Potential Solution
Currently, we can achieve this by doing the following:
- Define airflow operator args in model's yml file under
config
(or config.meta)
version: 2
models:
- name: model_a
config:
alias: model_a
operator_args:
retry_delay: 500
retries: 10
- Leverage node_converters
from airflow.models import BaseOperator
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup
from cosmos.airflow.graph import (
generate_task_or_group,
)
from cosmos.dbt.graph import DbtNode
def inject_operator_args(
dag: DAG,
task_group: TaskGroup,
node: DbtNode,
**kwargs,
) -> BaseOperator | TaskGroup | None:
"""
Return a task or group after injecting operator args
if present in a model/test config
"""
try:
operator_args = node.config["operator_args"]
kwargs["task_args"] = kwargs["task_args"] | operator_args
except KeyError:
# skip injection if `operator_args` not available
pass
except Exception as e:
raise
finally:
task = generate_task_or_group(
dag=dag,
task_group=task_group,
node=node,
**kwargs,
)
return task
Conclude
The above solution works. However, this feels like it should be a default behaviour of cosmos.
That sounds like a great idea! Your proposed solution seems well thought out and could definitely add value to the project. We encourage you to go ahead and create a pull request with your changes. Your contribution would be much appreciated by the community. Thank you for your willingness to improve the project!
About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.
Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.
If you want to continue the conversation, start your reply with @dosu-bot.
If Astronomer were to commit to an API that seeps more into the Dbt schema (note, with your suggestion this would be the first time Cosmos directly dictates how dbt code is written), I actually think that placing all Cosmos configuration inside of config.cosmos
rather than config.operator_args
would be a safer bet. So something like this:
version: 2
models:
- name: model_a
config:
alias: model_a
cosmos:
operator_args:
retry_delay: 500
retries: 10
It is not unreasonable to think of a world where Cosmos requires more keys than just operator_args
, and keeping them nice and tidy inside a cosmos:
mapping is probably the safer option.
With that slight change in the API I would fully endorse this feature. It comes up super often that you need to do stuff like this. (I actually have a related but separate issue I was intending on opening in the near future relating to providing easier access to customization of how Cosmos runs...)
Hey @dwreeves ,
Thanks for reviewing this issue quickly.
I agree entirely with your proposed change in API 😀
- There's a caveat to adding a new key that is not proposed under dbt's schema:
Most configurations are "clobbered" when applied hierarchically. Whenever a more specific value is available, it will completely replace the less specific value. Note that a few configs have different merge behavior:
See https://docs.getdbt.com/reference/configs-and-properties#combining-configs for details.
A user may expect the cosmos
key/value(s) to be merged instead of overwritten entirely if they define it in both dbt_project.yml
and model.yml
.
We could either accept this as default behaviour - as with the default behaviour for keys not specified by dbt-core
or make merge
a default behaviour. WDYT?
Either ways, we could document this behaviour in the cosmos docs.
Should I make an attempt to contribute this feature with your proposed API change?
I'm not a maintainer of this repo, so I won't be the right person to ask that. I was just putting in an API suggestion and a yes vote for the feature.
@linchun3 this is an excellent proposal - including the suggestion by @dwreeves. We'd love to have this feature! Please, feel free to work on this!