airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Fix: Reuse ProcessPoolExecutor in CeleryExecutor

Open luoyuliuyin opened this issue 1 year ago • 29 comments

PR of https://github.com/apache/airflow/issues/39482

When the scheduler send task to celery, if there is only 1 task in the current cycle, the task will be sent to the main thread; if there are multiple tasks, a thread pool will be created based on the number of CPU cores, and then all tasks will be consumed by the thread pool. There are some problems with the current implementation. The scheduler creates a thread pool every time it schedules, which will bring a very large performance overhead. In fact, the thread pool can be reused. image

When I tested, sometimes it would take almost 4 seconds to consume 32 tasks. image

If the thread pool is reused, it only takes 10 milliseconds. image image

luoyuliuyin avatar May 08 '24 11:05 luoyuliuyin

This is a very risky change if you have not thoroughly tested various configurations and I think it's for the worse on production systems. When you are using it on MacOS, yes it will be a bit slow because default multiprocessing context on MacOS is spawn https://docs.python.org/3/library/multiprocessing.html#multiprocessing-start-methods but on linux it is fork and this implementation is optimized for production case where Airflow is run on Linux.

Have you considered impact on production for this change?

The overhead for production (Linux and system where fork is supported) is minimal, because fork model is used for the processes. Forking a process that does very little is very inexpensive in this model, because the heap memory is in "copy on write" mode, which means that forking is nearly instanteneous and until some memory is modiified, no extra memory is used.

On the other hand, the change you propose will impact memory used in a long term as the long running processes might accumulate memory when run for a long time and run different callables. So it might cause memory leak, not mentioning that in the original model the forked processes are only needed for a very short time - to send tasks to celery and for example they are not consuming any resources when there are no tasks to run (because they are stopped right after they complete,

Have you tested it beyond MacOS to see what kind of performance gains you have and what impact on memory usage and resource usage at scale?

potiuk avatar May 08 '24 14:05 potiuk

On the other hand, the change you propose will impact memory used in a long term as the long running processes might accumulate memory when run for a long time and run different callables. So it might cause memory leak, not mentioning that in the original model the forked processes are only needed for a very short time - to send tasks to celery and for example they are not consuming any resources when there are no tasks to run (because they are stopped right after they complete,

This pool executor is only used to execute short-running tasks, I don't think it will impact the memory usage, and I believe that this kind of objects (process/thread pool executor) should be created once and reused.

If it's really a risky change, we can add a new configuration to switch between the behaviors and change the default value later when the new behavior is tested in production.

hussein-awala avatar May 08 '24 15:05 hussein-awala

Regarding risk considerations, in our company I have already released this change to production. Our production environment is centos8 and it has been running well until now. Our airflow scale has 30,000 DAGs, and the average daily task scheduling number is 1 million.

Here are some benefits:

  1. The maximum scheduling delay between two nodes is reduced from 10 seconds to less than 1 second.
  2. System throughput increased by more than 50%
  3. The machine load has dropped a lot image image

luoyuliuyin avatar May 08 '24 15:05 luoyuliuyin

Here are some benefits:

The maximum scheduling delay between two nodes is reduced from 10 seconds to less than 1 second. System throughput increased by more than 50% The machine load has dropped a lot

I assume it's only on MacOS? Do you see the same gains on Linux?

Maybe this shoud be MacOS only change ?

potiuk avatar May 08 '24 16:05 potiuk

And I am not totally against it - I am just surprised to see such numbers on Linux. It's been running like that for years, so it would be interesting to see if indeed this is is only the "spawn" vs "fork" issue or maybe there are other factors.

potiuk avatar May 08 '24 16:05 potiuk

@hussein-awala

This pool executor is only used to execute short-running tasks, I don't think it will impact the memory usage, and I believe that this kind of objects (process/thread pool executor) should be created once and reused.

If it's really a risky change, we can add a new configuration to switch between the behaviors and change the default value later when the new behavior is tested in production.

yes I think it's not significant overhead, I just wonder if there are no cases here which will accumulate over time. Actually in Python 3.11 there is a new feature in ProcessPoolExecutor that would be even better max_tasks_per_child that would be BEST approach - because then we could be able to run the process pool only once and let it recycle the processes after say 1000 tasks scheduled - and that would be rather safe change if we only do it in Python 3.11 and have a reasonable default for max_tasks_per_child.

potiuk avatar May 08 '24 16:05 potiuk

Also cc: @ashb @kaxil => maybe there is another reason why we are doing it this way for Celery Executor. I think this change has the potential to change resource usage of Airflow using Celery, so it needs to be carefully considered and I think it needs few more pairs of eyes.

potiuk avatar May 08 '24 16:05 potiuk

Here are some benefits:

The maximum scheduling delay between two nodes is reduced from 10 seconds to less than 1 second. System throughput increased by more than 50% The machine load has dropped a lot

I assume it's only on MacOS? Do you see the same gains on Linux?

Maybe this shoud be MacOS only change ?

The above data is my running results in the Linux environment. If ProcessPoolExecutor is created every time, no problem will be seen at the beginning, but over time the scheduling will become slower and slower, especially under high load conditions. If ProcessPoolExecutor can be reused, these problems will not occur.

luoyuliuyin avatar May 08 '24 16:05 luoyuliuyin

The above data is my running results in the Linux environment. If ProcessPoolExecutor is created every time, no problem will be seen at the beginning, but over time the scheduling will become slower and slower, especially under high load conditions. If ProcessPoolExecutor can be reused, these problems will not occur.

That is very interesting why. I would not expect it - do you have some more data showing trace of it and what gets slower?

potiuk avatar May 08 '24 17:05 potiuk

And just to add a little bit of context - why I am surprised to see it .... It's not that I am stubborn. I just looked deeper:

The current context manager behaviour has been introduced in 2020: https://github.com/apache/airflow/pull/7542

And previously the implementation looked like that:

        # Recreate the process pool each sync in case processes in the pool die
        self._sync_pool = Pool(processes=num_processes)

So at least we know why we are recreating the pool in the first place with every sync - to handle the case where the processes die. And indeed the Process Pool does not have "self-healing" option, so if the processes will die, they will not be recreated and eventually Celery executor will stop sending tasks.

This is why I consider those changes as "risky" - because there might be some good reasons why we are doing it in the first place. And the current proposal does not handle the case to be honest.

So if we change the behaviour here, we need to handle this case as well and be sure that what we are doing is not causing more problems that might hit us back.

And we need to know if we really need to do it. I would be really surprises that no-one noticed the slow-down you mentioned :)

potiuk avatar May 08 '24 18:05 potiuk

What Celery broker are you using?

ashb avatar May 08 '24 18:05 ashb

I suspect if we want to roll out this change we should do it behind a config flag so that it is a) not on by default, and b) easy to fall back to the old behavior if it causes problems for some people

ashb avatar May 08 '24 18:05 ashb

What happens if you kill -9 one (or all) of the ProcessPool processes and then try to schedule/enqueue new tasks?

ashb avatar May 08 '24 18:05 ashb

I am a little skeptical that the overhead for process pool creation is meaningful. Because i think the dominant performance consideration is the blocking IO sending the task to redis and waiting for response.

In order for the process pool creation to be dominant, you'd have to be sending very few tasks i'd think, in which case it would be fast anyway. No?

E.g. in your example above, you claim that it sometimes took 4 seconds to send 32 tasks. But, you did not establish that the bottleneck was process pool creation. How do you know it wasn't mostly just waiting for the broker to respond? Are you claiming that it takes 4 seconds to create the process pool? Maybe you would be able to get at this by adding a log message immediately after entering the context?

dstandish avatar May 08 '24 19:05 dstandish

broker I use rabbitmq, AIRFLOW__CELERY__BROKER_URL=amqp://admin:[email protected]:5672

luoyuliuyin avatar May 08 '24 19:05 luoyuliuyin

The above data is my running results in the Linux environment. If ProcessPoolExecutor is created every time, no problem will be seen at the beginning, but over time the scheduling will become slower and slower, especially under high load conditions. If ProcessPoolExecutor can be reused, these problems will not occur.

That is very interesting why. I would not expect it - do you have some more data showing trace of it and what gets slower?

Not yet, I can only locate here through the logs

luoyuliuyin avatar May 08 '24 20:05 luoyuliuyin

What happens if you kill -9 one (or all) of the ProcessPool processes and then try to schedule/enqueue new tasks?

I'll test it later

luoyuliuyin avatar May 08 '24 20:05 luoyuliuyin

I am a little skeptical that the overhead for process pool creation is meaningful. Because i think the dominant performance consideration is the blocking IO sending the task to redis and waiting for response.

In order for the process pool creation to be dominant, you'd have to be sending very few tasks i'd think, in which case it would be fast anyway. No?

E.g. in your example above, you claim that it sometimes took 4 seconds to send 32 tasks. But, you did not establish that the bottleneck was process pool creation. How do you know it wasn't mostly just waiting for the broker to respond? Are you claiming that it takes 4 seconds to create the process pool? Maybe you would be able to get at this by adding a log message immediately after entering the context?

I'm testing it through the logs. It takes a while. It usually takes about 2 days to see that the scheduling process slows down significantly.

luoyuliuyin avatar May 08 '24 20:05 luoyuliuyin

I'm testing it through the logs. It takes a while. It usually takes about 2 days to see that the scheduling process slows down significantly.

Understood. Did you see my suggestion about adding a message inside the context? If it's true that the pool is created at that point (i'm not sure) then that would show you how long it takes to actually create the pool and that would separate it from the sending. Maybe it's something else that is slow after 2 days?

dstandish avatar May 08 '24 20:05 dstandish

@vatsrahul1001 is going to run some performance benchmarks with the changes in this PR too in upcoming days.

kaxil avatar May 09 '24 11:05 kaxil

We conducted a throughput benchmarking test, executing 15k DAG's comprised of simple BashOperator tasks. We measured the success rate of tasks completed within a 2-minute interval and observed a 3.7% increase in throughput with this PR. @luoyuliuyin, would like to know how we can replicate the scenario under which your throughput increased by 50%.

vatsrahul1001 avatar May 09 '24 18:05 vatsrahul1001

We conducted a throughput benchmarking test, executing 15k DAG's comprised of simple BashOperator tasks. We measured the success rate of tasks completed within a 2-minute interval and observed a 3.7% increase in throughput with this PR. @luoyuliuyin, would like to know how we can replicate the scenario under which your throughput increased by 50%.

When I did performance testing, I also used a dag composed of a simple BashOperator. You may not see the difference when the load is relatively low. When I tested, the load of the airflow scheduler was very high. There were 40,000 scheduled dags (every 2 hours) and 10,000 manually triggered dags. The number of schedulers was 30, and the number of workers was 100

luoyuliuyin avatar May 09 '24 21:05 luoyuliuyin

We conducted a throughput benchmarking test, executing 15k DAG's comprised of simple BashOperator tasks. We measured the success rate of tasks completed within a 2-minute interval and observed a 3.7% increase in throughput with this PR. @luoyuliuyin, would like to know how we can replicate the scenario under which your throughput increased by 50%.

When I did performance testing, I also used a dag composed of a simple BashOperator. You may not see the difference when the load is relatively low. When I tested, the load of the airflow scheduler was very high. There were 40,000 scheduled dags (every 2 hours) and 10,000 manually triggered dags. The number of schedulers was 30, and the number of workers was 100

At the beginning, there may be no obvious difference in the task throughput of the two implementation solutions. After a long time (such as 1 day), the throughput of the existing implementation solution will gradually decrease. What I compare is the stable result.

luoyuliuyin avatar May 09 '24 21:05 luoyuliuyin

What happens if you kill -9 one (or all) of the ProcessPool processes and then try to schedule/enqueue new tasks?

After using kill -9 to kill all child processes created by ProcessPoolExecutor, the scheduler process will exit abnormally. Therefore, how to restore or roll back to the previous implementation needs to be considered. image image

By the way, the reason why I can run it stably in production is because my startup script has the ability to automatically restart, and the scheduler will restart after exiting abnormally.

luoyuliuyin avatar May 10 '24 02:05 luoyuliuyin

At the beginning, there may be no obvious difference in the task throughput of the two implementation solutions. After a long time (such as 1 day), the throughput of the existing implementation solution will gradually decrease. What I compare is the stable result.

I cannot think of a single reason (on the cliend side) why it would happen and why changing to a long running pool would change it. The way pool context manager works is that it will wait until all processes started in the pool complete their tasks and close the pool including all the processes freeing all the resources. So there is no rason why "airflow" would slow down.

However maybe that is a problem of your firewall/networking/rabbitmq wrong behaviour. The main difference between long running pool of processes and processes started temporarily to send the tasks, is that the long running processes might. (depending on implementation of the way client API works) reuse an open connection to the broker to send the tasks rather than open a new one. But if rabbitmq server is implemented properly, then it should have no effect of getting longer and longer over time, because after closing the processes sending the tasks, rabbitmq should free all the resources on the server side.

So maybe the problem is a that your rabbitmq leaks resources when processes sending tasks to it are closing down? Do you have some monitoring / can you please provide some data to back-up the statement that rabbitmq is actually leaking resources in this case? I think if airflow gets slower and slower over time, you should be able to see some resources leaking on either side - memory, CPU being the most likely candidates and my hypothesis is that it's rabbitmq misbehaving (which might be for example a known bug in some old version of rabbitmq. Quick search reveals similar behaviour observed https://groups.google.com/g/rabbitmq-users/c/v630G6OCxuU in some old versions of rabbitmq (but I have not looked in details of that conversation, it's just likely that it might be something similar). Can you please take a close look at the rabbitmq side of yours and see if you can observe some resource leaks when you go back to the current solution and maybe upgrade rabbitmq to latest versions to exclude the possibility it is some old bug?

potiuk avatar May 10 '24 05:05 potiuk

I am a little skeptical that the overhead for process pool creation is meaningful. Because i think the dominant performance consideration is the blocking IO sending the task to redis and waiting for response.

In order for the process pool creation to be dominant, you'd have to be sending very few tasks i'd think, in which case it would be fast anyway. No?

E.g. in your example above, you claim that it sometimes took 4 seconds to send 32 tasks. But, you did not establish that the bottleneck was process pool creation. How do you know it wasn't mostly just waiting for the broker to respond? Are you claiming that it takes 4 seconds to create the process pool? Maybe you would be able to get at this by adding a log message immediately after entering the context?

I added some logs for comparison, the longest time it takes is the processing of the task, and then the closing of the ProcessPoolExecutor. once ProcessPoolExecutor: image image

reuse ProcessPoolExecutor: image image

luoyuliuyin avatar May 10 '24 12:05 luoyuliuyin

At the beginning, there may be no obvious difference in the task throughput of the two implementation solutions. After a long time (such as 1 day), the throughput of the existing implementation solution will gradually decrease. What I compare is the stable result.

I cannot think of a single reason (on the cliend side) why it would happen and why changing to a long running pool would change it. The way pool context manager works is that it will wait until all processes started in the pool complete their tasks and close the pool including all the processes freeing all the resources. So there is no rason why "airflow" would slow down.

However maybe that is a problem of your firewall/networking/rabbitmq wrong behaviour. The main difference between long running pool of processes and processes started temporarily to send the tasks, is that the long running processes might. (depending on implementation of the way client API works) reuse an open connection to the broker to send the tasks rather than open a new one. But if rabbitmq server is implemented properly, then it should have no effect of getting longer and longer over time, because after closing the processes sending the tasks, rabbitmq should free all the resources on the server side.

So maybe the problem is a that your rabbitmq leaks resources when processes sending tasks to it are closing down? Do you have some monitoring / can you please provide some data to back-up the statement that rabbitmq is actually leaking resources in this case? I think if airflow gets slower and slower over time, you should be able to see some resources leaking on either side - memory, CPU being the most likely candidates and my hypothesis is that it's rabbitmq misbehaving (which might be for example a known bug in some old version of rabbitmq. Quick search reveals similar behaviour observed https://groups.google.com/g/rabbitmq-users/c/v630G6OCxuU in some old versions of rabbitmq (but I have not looked in details of that conversation, it's just likely that it might be something similar). Can you please take a close look at the rabbitmq side of yours and see if you can observe some resource leaks when you go back to the current solution and maybe upgrade rabbitmq to latest versions to exclude the possibility it is some old bug?

Maybe as you said, the reason is the resource leak caused by rabbitmq. I will investigate this aspect first and synchronize it if there are other new clues.

luoyuliuyin avatar May 10 '24 12:05 luoyuliuyin

@luoyuliuyin have we verified if it's due to a resource leak caused by Rabbitmq?

vatsrahul1001 avatar May 15 '24 14:05 vatsrahul1001

@luoyuliuyin have we verified if it's due to a resource leak caused by Rabbitmq?

I'm not sure, I've upgraded rabbitmq to the latest version (3.13.2), but rabbitmq consumption is still getting slower and slower

luoyuliuyin avatar May 15 '24 15:05 luoyuliuyin

@luoyuliuyin have we verified if it's due to a resource leak caused by Rabbitmq?

I'm not sure, I've upgraded rabbitmq to the latest version (3.13.2), but rabbitmq consumption is still getting slower and slower

What happens when you restart rabbitmq rather than scheduler?

potiuk avatar May 27 '24 10:05 potiuk