airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Add support for async callables in PythonOperator

Open dabla opened this issue 3 weeks ago • 6 comments

This PR is related to the discussion I started on the devlist and which allows you to natively execute async code on PythonOperators.

There is also an AIP for this: https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-98%3A+Rethinking+deferrable+operators%2C+async+hooks+and+performance+in+Airflow+3

Below an example which show you how it can be used with async hooks:

@task(show_return_value_in_logs=False)
async def load_xml_files(files):
    import asyncio
    from io import BytesIO
    from more_itertools import chunked
    from os import cpu_count
    from tenacity import retry, stop_after_attempt, wait_fixed

    from airflow.providers.sftp.hooks.sftp import SFTPClientPool

    print("number of files:", len(files))

    async with SFTPClientPool(sftp_conn_id=sftp_conn, pool_size=cpu_count()) as pool:
        @retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
        async def download_file(file):
            async with pool.get_sftp_client() as sftp:
                print("downloading:", file)
                buffer = BytesIO()
                async with sftp.open(file, encoding=xml_encoding) as remote_file:
                    data = await remote_file.read()
                    buffer.write(data.encode(xml_encoding))
                    buffer.seek(0)
                return buffer

        for batch in chunked(files, cpu_count() * 2):
            tasks = [asyncio.create_task(download_file(f)) for f in batch]

            # Wait for this batch to finish before starting the next
            for task in asyncio.as_completed(tasks):
                result = await task
		 # Do something with result or accumulate it and return it as an XCom

^ Add meaningful description above Read the Pull Request Guidelines for more information. In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed. In case of a new dependency, check compliance with the ASF 3rd Party License Policy. In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

dabla avatar Dec 05 '25 10:12 dabla

Why do we need a separate operator class? Can this not just be a part of PythonOperator?

uranusjr avatar Dec 08 '25 07:12 uranusjr

Why do we need a separate operator class? Can this not just be a part of PythonOperator?

Good remark, will check, I found it cleaner to split but indeed maybe it's not necessary.

dabla avatar Dec 08 '25 08:12 dabla

It’s indeed cleaner from the implementation-wise, but I’d prefer PythonOperator to just work with async functions, instead of needing a separate class. It’d be easier if there’s one class instead of two since PythonVirtualenvOperator etc. would also work.

uranusjr avatar Dec 08 '25 08:12 uranusjr

It’s indeed cleaner from the implementation-wise, but I’d prefer PythonOperator to just work with async functions, instead of needing a separate class. It’d be easier if there’s one class instead of two since PythonVirtualenvOperator etc. would also work.

Thanks TP for you remark, you're indeed right, didn't really think about it as you don't see it when using the @task decorator, but that would be indeed annoying if you had to use another PythonOperator for async functions, good catch!

I will fix this, but that would of course mean the PythonOperator would by default extend the BaseAsyncOperator instead of the BaseOperator, but that's not an issue I suppose. The the is_async property will need to be overriden in the PythonOperator to be able to check the python_callable if it's async or not.

dabla avatar Dec 08 '25 09:12 dabla

On thing I have to take into account: Airflow 2

So I think we will have basic PythonOperator implementation without async support for Airflow 2 and for Airflow 3.2 for example we will do the one with async support, as the BaseAsyncOperator will not exist before in previous Airflow versions.

dabla avatar Dec 08 '25 09:12 dabla

@uranusjr @Lee-W I've refactored PythonOperator as suggested, already tested it locally and it works. So no more need for dedicated AsyncPythonOperator, it also made the decorator part easier so there I could also remove some code. I also took into account the Airflow version, so that PythonOperator works without async support as before if Airflow version is too old.

dabla avatar Dec 08 '25 09:12 dabla

An additional PR will be opened in which improvements to SFTPHookAsync have been made, I removed those from this PR to make review easier and not add unnecessary complexity not directly related to the async PythonOperator.

dabla avatar Dec 16 '25 16:12 dabla

This is really nice and clean and it actually shows why we actually should handle async tasks this way - @dstandish -> this PR is now extracted by @dabla to only implement the base changes we discuss in https://lists.apache.org/[email protected] - I think you should take a look to see how much nicer it would be and how nice it will allow us to extend it further for other decorators.

I guess we want to still get a quick AIP approval on it @dabla - assuming that others will also thumb-up it (but I really see how it might be useful).

This one will need newsfragment and nice docs explaining what and why as well of course :)

potiuk avatar Dec 17 '25 12:12 potiuk

Added example dags with async PythonOperator into the documentation.

dabla avatar Dec 19 '25 09:12 dabla

I think this is good. I feel like we should keep a copy of BaseAsyncOperator (temporarily) in the standard provider so the feature can be available in old Airflow versions. That would make user experience a bit better—they simply need to upgrade the provider.

Or common compat library? Idk just thinking...

dabla avatar Dec 19 '25 10:12 dabla