datahub
datahub copied to clipboard
PythonOperator not support snowflake dataset urn
Describe the bug PythonOperator not support snowflake dataset urn when there is only outlets in conf
To Reproduce Steps to reproduce the behavior:
- This is my airflow dag:
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
try:
from airflow.operators.bash import BashOperator
except ModuleNotFoundError:
from airflow.operators.bash_operator import BashOperator
from datahub_provider.entities import Dataset
# python op
from airflow.operators.python_operator import PythonOperator
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"execution_timeout": timedelta(minutes=5),
}
def my_func():
print('python done!')
with DAG(
"datahub_test01",
default_args=default_args,
description="iiiiiiAn example DAG demonstrating the usage of DataHub's Airflow lineage backend.",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=["example_tag"],
catchup=False,
) as dag:
task_python = PythonOperator(
task_id = 'first_python_task',
python_callable=my_func,
dag=dag,
outlets={"datasets": [Dataset("snowflake", "mydb.schema.tableF")]},
)
task_python
2.After I run my dag, the airflow log is: [2022-06-29 11:05:57,717] {logging_mixin.py:109} INFO - Exception: Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/datahub_airflow_plugin/datahub_plugin.py", line 336, in custom_on_success_callback datahub_on_success_callback(context) File "/home/airflow/.local/lib/python3.8/site-packages/datahub_airflow_plugin/datahub_plugin.py", line 220, in datahub_on_success_callback datajob.outlets.append(outlet.urn) AttributeError: 'dict' object has no attribute 'urn'
Expected behavior The meta info in datahub is empty.
Screenshots None
Desktop (please complete the following information):
- OS: Mac
- Browser chrome
- Version 22
Additional context Add any other context about the problem here.
same here with PythonOperator + outlets={"datasets": [Dataset("dbt", f"{database_name}.{schema_name}.{model_name}")]},
This issue is stale because it has been open for 15 days with no activity. If you believe this is still an issue on the latest DataHub release please leave a comment with the version that you tested it with. If this is a question/discussion please head to https://slack.datahubproject.io. For feature requests please use https://feature-requests.datahubproject.io
@gvillafanetapia @AprilJoy We need to update our docs because the correct way to set outlets is the following: https://github.com/datahub-project/datahub/blob/ae577ddff24d6ac0db84fe6844590cd154c81f59/metadata-ingestion/examples/airflow/circuit_breaker/long_tail_companion/01-operation/ecommerce/01_snowflake_load.py#L35
Basically you don't need the datasets
object and it should look like this:
outlets=[Dataset("snowflake", "mydb.schema.tableF")],
)
I update the examples in this pr -> https://github.com/datahub-project/datahub/pull/5696/files
The issue was because of an incorrect example provided. The example has been fixed now in the above PR. Closing.