astro-sdk
astro-sdk copied to clipboard
Add the ability set task dependency without a data dependency
Please describe the feature you'd like to see Currently with the Astro SDK there is no easy way to chain together tasks without having a data dependency. Let's take an example where someone wants to wait until a table is truncated before adding in new data. Currently the only way to do that is to use the traditional bitshift operators like so:
@aql.run_raw_sql(conn_id='sqlite_conn')
def truncate_foo():
return """TRUNCATE TABLE foo;
)"""
@dag(schedule_interval='0 * * * *', start_date=datetime(2022, 7, 13, 21, 52, 12), catchup=False, tags=[])
def example_sqlite_astro_build_comment():
truncated_table = truncate_foo()
loaded_data = aql.load_file(File("https://my/file.csv"), output_table=Table("foo", conn_id="my_conn"))
truncated_table >> loaded_data
dag_obj = example_sqlite_astro_build_comment()
This of course pulls users out of the promise that we are building a more pythonic functional library. Currently even if we were to TRY to use functional composition like this:
@aql.run_raw_sql(conn_id='sqlite_conn')
def truncate_foo():
return """TRUNCATE TABLE foo;
)"""
def truncate_bar(table: Table):
return "TRUNCATE TABLE bar"
@dag(schedule_interval='0 * * * *', start_date=datetime(2022, 7, 13, 21, 52, 12), catchup=False, tags=[])
def example_sqlite_astro_build_comment():
truncated_table = truncate_foo()
truncate_bar(truncated_table)
loaded_data = aql.load_file(File("https://my/file.csv"), output_table=Table("foo", conn_id="my_conn"))
truncate_bar >> loaded_data
We would get an error because the run_raw_sql
function does not place anything in xcom, so there's nothing to render.
This issue leads to both a bad story when passing dependencies functionally, and when trying to tie astro-sdk tasks with traditional Airflow operators.
@dag(schedule_interval='0 * * * *', start_date=datetime(2022, 7, 13, 21, 52, 12), catchup=False, tags=[])
def example_sqlite_astro_build_comment():
b = BashOperator(...)
truncated_table = truncate_foo()
b >> truncated_table
Describe the solution you'd like
I see a two-part solution to this issue.
The first part of this solution is to add an upstream_tasks
protected kwarg to all operators. This would allow arbitrary task outputs to be a part of any downstream function. We could potentially even ensure that the xcoms are not actually rendered to prevent multiple dataframes on a single worker.
@aql.run_raw_sql(conn_id='sqlite_conn')
def truncate_foo():
return """TRUNCATE TABLE foo;
)"""
@dag(schedule_interval='0 * * * *', start_date=datetime(2022, 7, 13, 21, 52, 12), catchup=False, tags=[])
def example_sqlite_astro_build_comment():
truncated_table = truncate_foo()
loaded_data = aql.load_file(upstream_tasks=[truncated_table], File("https://my/file.csv"), output_table=Table("foo", conn_id="my_conn"))
dag_obj = example_sqlite_astro_build_comment()
This fix would also simplify the story of tying traditional airflow tasks to astro sdk tasks
@dag(schedule_interval='0 * * * *', start_date=datetime(2022, 7, 13, 21, 52, 12), catchup=False, tags=[])
def example_sqlite_astro_build_comment():
b = BashOperator(...)
truncated_table = truncate_foo(upstream_tasks=[b])
The second part of the fix is that run_raw_sql
should no longer return "none". It needs to put SOMETHING into the XCOM table s.t. we can reference it downstream tasks. This can be essentially a NOOP or a metadata object.
Are there any alternatives to this feature? The current alternative is to use traditional bitshift operators.
Additional context Add any other context about the feature request here.
Acceptance Criteria
- [ ] All checks and tests in the CI should pass
- [ ] Unit tests (90% code coverage or more, once available)
- [ ] Integration tests (if the feature relates to a new database or external service)
- [ ] Example DAG
- [ ] Docstrings in reStructuredText for each of methods, classes, functions and module-level attributes (including Example DAG on how it should be used)
- [ ] Exception handling in case of errors
- [ ] Logging (are we exposing useful information to the user? e.g. source and destination)
- [ ] Improve the documentation (README, Sphinx, and any other relevant)
- [ ] How to use Guide for the feature (example)
- [ ] Add documentation on using the
upstream_tasks
kwarg - [ ] Add documentation about how to use Astro SDK tasks with traditional Airflow tasks
@kaxil should we add this to the milestone 1.1.0?
@dag(schedule_interval='0 * * * *', start_date=datetime(2022, 7, 13, 21, 52, 12), catchup=False, tags=[])
def example_sqlite_astro_build_comment():
b = BashOperator(...)
truncated_table = truncate_foo()
b >> truncated_table
What's the problem with this example?
The second part of the fix is that run_raw_sql should no longer return "none". It needs to put SOMETHING into the XCOM table s.t. we can reference it downstream tasks. This can be essentially a NOOP or a metadata object.
I think this is only true if you want to pass the "value" to another Taskflow function, which you aren't doing in any of your proposed examples. i.e. if you wanted to do this
@aql.run_raw_sql(conn_id='sqlite_conn')
def truncate_foo():
return """TRUNCATE TABLE foo;
)"""
@aql.run_raw_sql(conn_id='sqlite_conn')
def something(_):
return "SELECT 1;"
@dag(schedule_interval='0 * * * *', start_date=datetime(2022, 7, 13, 21, 52, 12), catchup=False, tags=[])
def example_sqlite_astro_build_comment():
truncated_table = truncate_foo()
faux_select(truncated_table)
(Note that to make it work at all you have to have an argument called _
in this example to pass the XComArg in at parse time)
We need to rebase it with main branch. Need @tatiana's input on this. ETA for merge: today