astronomer-cosmos
astronomer-cosmos copied to clipboard
How to test sources in Astronomer Cosmos DAG
Hi, I'm experimenting with Astronomer Cosmos. I read trough documentation and I didn't find an option to include sources tests in dag when using 'after_each' behaviour. https://astronomer.github.io/astronomer-cosmos/configuration/testing-behavior.html
Is there an option for that?
My dag
from datetime import datetime
import os
from airflow.datasets import Dataset
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
profile_config = ProfileConfig(
profile_name="test",
target_name="dev",
profiles_yml_filepath=os.environ['AIRFLOW_HOME'] +"/profiles.yml",
)
my_cosmos_dag = DbtDag(
project_config=ProjectConfig(
os.environ['AIRFLOW_HOME'] +"/dbt_project",
),
profile_config=profile_config,
execution_config=ExecutionConfig(
dbt_executable_path="dbt",
),
# normal dag parameters
start_date=datetime.now(),
catchup=False,
dag_id="sample_dag",
schedule=[Dataset("airbyte://example_source")],
)
Hi @maver1ck , thanks for reaching out!
At the moment, Cosmos doesn't expose this, but it would be a simple change, similar to #474
Would you like to contribute?
The change would be around this part of the code: https://github.com/astronomer/astronomer-cosmos/blob/435a6996cf404bc2a9b9a23c7957e72786fab8ec/cosmos/airflow/graph.py#L171
The critical part of this improvement would be adding tests to ensure we continue rendering the Airflow DAG as intended.
Hi @maver1ck , I wanted to follow up on this issue - since we made some progress.
As of Cosmos 1.2.1, it does not render dbt Source nodes as Airflow tasks by default.
Since Cosmos 1.2.0, we've introduced support for customizing how the library converts any dbt node into Airflow: https://astronomer.github.io/astronomer-cosmos/configuration/render-config.html#customizing-how-nodes-are-rendered-experimental
This means users can already customise the desired behaviour for Source nodes, using something like: https://github.com/astronomer/astronomer-cosmos/blob/11ce2d718483f1d4eb1bfdac659417fb66a1492e/dev/dags/example_cosmos_sources.py#L63 And you could run their tests.
The open questions are:
- Do we want Cosmos to have a default behaviour for dbt Source nodes and automatically generate tasks for nodes of this type?
- If the answer to (1) is yes, what do we want the behaviour for those nodes to be? Should we run:
dbt source freshness
Please, feel free to reply these in this dedicated ticket: https://github.com/astronomer/astronomer-cosmos/issues/630
If source nodes have a standard behaviour in Cosmos, then the current behaviour for models (create a TaskGroup with run and test) will immediately apply to sources, because:
-
We created a list of testable
DbtResourceType
s: https://github.com/astronomer/astronomer-cosmos/blob/main/cosmos/constants.py#L87 -
These are used by Cosmos to decide if it should create an individual task or a task group: https://github.com/astronomer/astronomer-cosmos/blob/11ce2d718483f1d4eb1bfdac659417fb66a1492e/cosmos/airflow/graph.py#L164
-
This currently doesn't work for
DbtResourceType.SOURCE
just because we don't have a default behaviour forResourceType.SOURCE
: https://github.com/astronomer/astronomer-cosmos/blob/11ce2d718483f1d4eb1bfdac659417fb66a1492e/cosmos/airflow/graph.py#L101 -
From the moment we extend Cosmos to have some default behaviour for Source nodes, and adapt the task metadata method used in (3) and do something similar to: https://github.com/astronomer/astronomer-cosmos/blob/11ce2d718483f1d4eb1bfdac659417fb66a1492e/cosmos/operators/local.py#L454 it will automatically render Source nodes and their tests, as expected
Following as I'm interested in running dbt source freshness - thank you for a great project 🙏
Following this issue too, I'm also interested to have source tests rendered as nodes for added task visibility.
+1 interested in this feature. Thanks!