Add support for async callables in PythonOperator
This PR is related to the discussion I started on the devlist and which allows you to natively execute async code on PythonOperators.
There is also an AIP for this: https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-98%3A+Rethinking+deferrable+operators%2C+async+hooks+and+performance+in+Airflow+3
Below an example which show you how it can be used with async hooks:
@task(show_return_value_in_logs=False)
async def load_xml_files(files):
import asyncio
from io import BytesIO
from more_itertools import chunked
from os import cpu_count
from tenacity import retry, stop_after_attempt, wait_fixed
from airflow.providers.sftp.hooks.sftp import SFTPClientPool
print("number of files:", len(files))
async with SFTPClientPool(sftp_conn_id=sftp_conn, pool_size=cpu_count()) as pool:
@retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
async def download_file(file):
async with pool.get_sftp_client() as sftp:
print("downloading:", file)
buffer = BytesIO()
async with sftp.open(file, encoding=xml_encoding) as remote_file:
data = await remote_file.read()
buffer.write(data.encode(xml_encoding))
buffer.seek(0)
return buffer
for batch in chunked(files, cpu_count() * 2):
tasks = [asyncio.create_task(download_file(f)) for f in batch]
# Wait for this batch to finish before starting the next
for task in asyncio.as_completed(tasks):
result = await task
# Do something with result or accumulate it and return it as an XCom
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.
Why do we need a separate operator class? Can this not just be a part of PythonOperator?
Why do we need a separate operator class? Can this not just be a part of PythonOperator?
Good remark, will check, I found it cleaner to split but indeed maybe it's not necessary.
It’s indeed cleaner from the implementation-wise, but I’d prefer PythonOperator to just work with async functions, instead of needing a separate class. It’d be easier if there’s one class instead of two since PythonVirtualenvOperator etc. would also work.
It’s indeed cleaner from the implementation-wise, but I’d prefer PythonOperator to just work with async functions, instead of needing a separate class. It’d be easier if there’s one class instead of two since PythonVirtualenvOperator etc. would also work.
Thanks TP for you remark, you're indeed right, didn't really think about it as you don't see it when using the @task decorator, but that would be indeed annoying if you had to use another PythonOperator for async functions, good catch!
I will fix this, but that would of course mean the PythonOperator would by default extend the BaseAsyncOperator instead of the BaseOperator, but that's not an issue I suppose. The the is_async property will need to be overriden in the PythonOperator to be able to check the python_callable if it's async or not.
On thing I have to take into account: Airflow 2
So I think we will have basic PythonOperator implementation without async support for Airflow 2 and for Airflow 3.2 for example we will do the one with async support, as the BaseAsyncOperator will not exist before in previous Airflow versions.
@uranusjr @Lee-W I've refactored PythonOperator as suggested, already tested it locally and it works. So no more need for dedicated AsyncPythonOperator, it also made the decorator part easier so there I could also remove some code. I also took into account the Airflow version, so that PythonOperator works without async support as before if Airflow version is too old.
An additional PR will be opened in which improvements to SFTPHookAsync have been made, I removed those from this PR to make review easier and not add unnecessary complexity not directly related to the async PythonOperator.
This is really nice and clean and it actually shows why we actually should handle async tasks this way - @dstandish -> this PR is now extracted by @dabla to only implement the base changes we discuss in https://lists.apache.org/[email protected] - I think you should take a look to see how much nicer it would be and how nice it will allow us to extend it further for other decorators.
I guess we want to still get a quick AIP approval on it @dabla - assuming that others will also thumb-up it (but I really see how it might be useful).
This one will need newsfragment and nice docs explaining what and why as well of course :)
Added example dags with async PythonOperator into the documentation.
I think this is good. I feel like we should keep a copy of BaseAsyncOperator (temporarily) in the standard provider so the feature can be available in old Airflow versions. That would make user experience a bit better—they simply need to upgrade the provider.
Or common compat library? Idk just thinking...