astronomer-cosmos
astronomer-cosmos copied to clipboard
How to store all test results
I'm looking to store test results (Both the successful and failures so https://docs.getdbt.com/reference/resource-configs/store_failures won't be sufficient). I'm considering using a dbt package such as https://github.com/brooklyn-data/dbt_artifacts which works with an on run end which will upload all the results at the end. As far as I'm aware airflow doesn't allow for always run a task on run end like how dbt does. So is there any way that I can materialize the results of all the tests using Cosmos? If not currently would it be possible to do somehow?
For added context we're also looking to integrate these test results into Atlan.
🤖
Hi @DanMawdsleyBA! I'm Dosu, a friendly bot here to assist you with your questions and issues related to Cosmos while we wait for a human maintainer. Don't worry, although I'm a bot, I'm here to make your experience smoother and easier. Looking forward to helping you out!
Yes, it is possible to materialize and store all test results, both successful and failures, using Cosmos. You can achieve this by using the on_warning_callback
parameter in the create_test_task_metadata
function. This callback function is called on warnings with additional Context variables “test_names” and “test_results” of type List.
You can define a callback function that logs all test results and stores them in a desired location. This location can then be integrated with Atlan.
Here is an example of how you can define the callback function:
def log_test_results(context):
test_names = context['test_names']
test_results = context['test_results']
for test_name, test_result in zip(test_names, test_results):
# Log the test result
logger.info(f"Test {test_name} result: {test_result}")
# Store the test result in a desired location
# This can be a database, a file, etc.
store_test_result(test_name, test_result)
You can then pass this function as the on_warning_callback
parameter when calling create_test_task_metadata
:
test_meta = create_test_task_metadata(
"test",
execution_mode,
test_indirect_selection,
task_args=task_args,
node=node,
on_warning_callback=log_test_results,
)
This will ensure that all test results are logged and stored, and can then be integrated into Atlan.
The create_test_task_metadata
function in the Cosmos codebase does support the on_warning_callback
parameter for logging and storing all test results. This is evident in the DbtTestKubernetesOperator
class where the on_warning_callback
parameter is used in the __init__
method. If the on_warning_callback
is provided, it is stored as an instance variable and used in the _handle_warnings
method to process warnings from the logs.
In the _handle_warnings
method, if there are warnings in the logs, the on_warning_callback
is called with the updated context that includes the test names and results.
You can find more details in the cosmos/airflow/graph.py and cosmos/operators/kubernetes.py files in the Cosmos repository.
I hope this helps! If you have any other questions, feel free to ask.
-Dosu
Sources
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.
Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.
If you want to continue the conversation, start your reply with @dosu-bot.
@tatiana / any thoughts on the above?
It might also be desirable to have a task run on end that syncs the run_results.json with an S3 bucket (Perhaps something similar to the docs operator) as Atlan can interpret it directly. This task would have to always run regardless of if previous tasks have failed (In order to capture test results that are failures)
Although my concern with the above would be what happens if an airflow deployment gets redeployed would those run_results.json files be overidden/deleted.
Hi @DanMawdsleyBA, this is a very valid use case - and it would be great if Cosmos could support it.
At the moment, Cosmos 1.3 only offers a built-in "post command" customisation via the callback feature, which only works on failed tests.
The other possibility you'd have is to customise how you'd like dbt test to behave in Cosmos, using this feature.
How are you running your tests in Cosmos? Are you using TestBehavior.AFTER_ALL
or TestBehavior.AFTER_EACH
(default)?
From an Atlan perspective, would it be okay if those test results files were sent on a per-task basis, considering you're running using TestBehavior.AFTER_EACH
?
Have you tried using https://github.com/brooklyn-data/dbt_artifacts with Cosmos? From what I understood from the documentation, it would be triggered as part of the dbt command execution itself, so I'd expect it to work.
Another feature we could consider implementing is to expose in Cosmos the ability to upload the artefacts generated in target
to {s3, gcs, azure, etc). What do you think? One challenge with this approach may be how we'd like to differentiate between DAG runs/task runs.
Another feature we could consider implementing is to expose in Cosmos the ability to upload the artefacts generated in target to {s3, gcs, azure, etc). What do you think?
That would be great functionality, and it could be useful in another scenarios. For example, when running a bigger DBT project as a single task (to avoid overhead with starting new pods), and one of the tasks fails, we could use the retry
command to rerun only the failed models. To do that, we would need to have run_results.json
. Plus, it would help with the debugging to see the compiled code.
@tatiana I will add one thing to the problem.
In case tests fail,
https://github.com/astronomer/astronomer-cosmos/blob/7c4d7d6e0c680a4847414ed4eb525b7baa27c8c1/cosmos/operators/local.py#L342
this line will actually throw an exception before the callback is executed, so run_results.json
is actually lost and in many cases there's no way to report on failed tests.
@ms32035,
yes, FYI, I raised the same issue here: https://github.com/astronomer/astronomer-cosmos/issues/867
@tatiana / any thoughts on the above?
It might also be desirable to have a task run on end that syncs the run_results.json with an S3 bucket (Perhaps something similar to the docs operator) as Atlan can interpret it directly. This task would have to always run regardless of if previous tasks have failed (In order to capture test results that are failures)
Although my concern with the above would be what happens if an airflow deployment gets redeployed would those run_results.json files be overidden/deleted.
That'd be amazing!