airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Implemented StreamedOperator

Open dabla opened this issue 4 months ago • 6 comments

As proposed on the devlist I've implemented a new oprator called the "StreamedOperator". Of course I don't know if this is a good name, but I'll explain what I wanted to achieve within Airflow.

At our company we have some DAG's that have to process a lot of paged results, which are returned as indexed XCom's. If those indexed XCom's aren't that many, it's easily possible to process those using the MappedOperator with the partial and expand functionality. But once you have like more than 1k indexed XComs, processing those becomes very hard (unless maybe you have a beefy Postgres database behind it due to the high number of dynamic task being created).

We are using the KubernetesExecutor, which means that each taskinstance runs on a dedicated worker, and of course you can use parallelism to process multiple tasks at once, but when there are too many dynamic tasks for one operator, this become's more a problem than a solution. Processing is actually slower and the UI has also trouble monitoring all those dynamic tasks (see screenshot which shows difference in execution time between expand and streamed). As you can see, the difference in performance is huge when you compare the expand vs stream solution.

image

So to bypass those issues, I've thought of an operator which instead of expanding all mapped arguments, "streams" them. The reason why I call it stream (and implemented it as a StreamedOperator) is because I inspired this solution from the Java 8 stream API, which allows you to process a list or an iterable in a lazy way, and if wanted, apply parallelism to it. Here of course it's not completely the same, but the idea behind it is. The advantage of this is that for Airflow, all mapped arguments which are translated to multiple task instances, are actually processed as one within one operator, the StreamedOperator that is. You could see this solution as some kind of a conccurent for loop within an operator for mapped parameters.

But as opposed to the MappedOperator, the StreamedOperator will execute the partial operator within the same operator and task instance using asynchronous code and a semaphore, the later one is being used to limit to number of threads being used simultaneously. This can be done by specifying the 'max_active_tis_per_dag' parameter, but if not specified it will use the number of cpu's available withing that worker. If you don't want parallelism, you can set it to 1 so that each task gets executed sequentially. Sometimes this can be handy if you don't want to "DDOS" a REST endpoint and so avoid being throttled. I a tasks fail, it will use the do same as with dynamic tasks mapping an retry it until it's number of retries are exceeded. Also the retry_delay will work the same way, of course only failed tasks will be retried. You will notice that most of the code is actually re-used code from Airflow, except the async part execution and the complete evaluation of all values in the ExpandInput instance is new code.

Also async operators, which use a trigger, will be executed that way, so all processing happens asynchronously within the same operator and thus task instance and thus worker. Of course this can be perceived a bit as hackish code to achieve this way of working within Airflow, but this allowed me to easily patch our own Airflow installation and allowed me to "easily" add this functionality. This functionality could also be implemented as an alternative strategy within the expand method using a parameter to decide which one to use, still I personally found a dedicated stream method more elegant.

Of course the code could still use some refactoring, but I tried to implement it as clean as possible. This is still a draft, so I still need to add some unit tests, which shouldn't be that big of a challenge. It would also be a solution of the question asked here without the need of custom code.

Here a simple example of a DAG using the stream functionality:

with DAG(
    "streamed_operator_performance_test",
    default_args=DEFAULT_ARGS,
    schedule_interval=timedelta(hours=24),
    max_active_runs=5,
    concurrency=5,
    catchup=False,
) as dag:
    distinct_users_ids_task = SQLExecuteQueryOperator(
        task_id="distinct_users_ids",
        conn_id="odbc_dev",
        sql="SELECT TOP 1000 ID FROM USERS",
        dag=dag,
    )
 
    user_registered_devices_task = MSGraphAsyncOperator.partial(
        task_id="user_registered_devices",
        conn_id="msgraph_api",
        url="users/{userId}/registeredDevices",
        retry_delay=60,
        dag=dag,
    ).stream(path_parameters=distinct_users_ids_task.output.map(lambda u: {"userId": u[0]}))

^ Add meaningful description above Read the Pull Request Guidelines for more information. In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed. In case of a new dependency, check compliance with the ASF 3rd Party License Policy. In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

dabla avatar Sep 29 '24 17:09 dabla