airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Fix k8s pod.execute randomly stuck indefinitely by logs consumption (#23497)

Open schattian opened this issue 3 years ago • 32 comments

related: #23618 fixes leaking loop in https://github.com/apache/airflow/commit/67474cccb612fad39a2afbeee16014909447d242. That also involved changing get_event_loop to be new_event_loop so it is closable (otherwise, parallel usage of the function could try to reuse a closed loop).

Finally, it fixes previously broken (modified) test. Notice that one call to container_is_running was leaked to one test that only made 3 itself.

Unfortunately, cant run all ci tests setup in my machine (oom)

schattian avatar May 11 '22 22:05 schattian

Now added explicit shutdown of the executor. However.. it's strange, loop.close should be shutting down the default executor: https://github.dev/python/cpython/blob/f882d33778ee2625ab32d90e28edb6878fb8af93/Lib/asyncio/base_events.py#L678-L683 If that doesn't work it is probably a bug somewhere else or a typo im not seeing...

schattian avatar May 12 '22 12:05 schattian

Looking at this again - clearly an oversight from my side. The thread continues running the sync function (logs_stream, blocking the executor). Cancel is only useful while task is pending. I will add a walkaround to this later today.

schattian avatar May 12 '22 13:05 schattian

Yeah. Using Asyncio is kinda tricky for standalone operator. It sounds a bit tricky with this approach. Seems that we are trying to start asyncio.loop to run asyncio operations and then we kill the thread after closing the operator. This sounds rather complex as we do not really achieve the "async" promises.

Just a thought - maybe we should somehew plug it in the "defferable" operators and triggerer ? I am not sure if that would be efficient for getting the logs using triggerer and triggers ? Or maybe we should abandon the idea of async operations here as they do not really give us anything and only cause troublesome testing and difficult debugging in the future?

@andrewgodwin - maybe you can share your thoughts on that ?

potiuk avatar May 12 '22 20:05 potiuk

@potiuk I had one thing in mind to try to reduce the complexity of this, releasing the connection (response.release_conn). Based on https://github.dev/urllib3/urllib3/blob/37a67739ba9a7835886555744e9dabe3009e538a/src/urllib3/response.py#L708-L712 seems could work.

Much harder to test though as rn test is just a simple generator.

schattian avatar May 12 '22 22:05 schattian

@potiuk i replaced the asyncio-based approach with an executor to use multiprocessing.Process instead. I added one more test case also for the case api hangs + no logs. Imho this is the best to do to not overcomplicate this, wdyt?

schattian avatar May 16 '22 21:05 schattian

Ok, now i dont see any relation between the failing tests and the pr.

schattian avatar May 18 '22 15:05 schattian

@potiuk could you please take a look? would be nice to have, personally I experience this bug regularly (once a week) on gke. Or is it waiting for a specific reviewer?

schattian avatar May 23 '22 21:05 schattian

(And Sorry for long delays in review @schattian - we had Airflow Summit last week which kept many of the reviewers busy).

potiuk avatar May 31 '22 10:05 potiuk

The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.

github-actions[bot] avatar May 31 '22 10:05 github-actions[bot]

(And Sorry for long delays in review @schattian - we had Airflow Summit last week which kept many of the reviewers busy).

oh, sry, didnt know that. no worries! and thanks for the rev

schattian avatar May 31 '22 15:05 schattian

@dstandish @ashb @ephraimbuddy @jedcunningham - I think we need some of your's expert knowledge here

potiuk avatar Jun 04 '22 21:06 potiuk

@schattian any reason you are using multiprocessing instead of a thread?

dstandish avatar Jun 06 '22 18:06 dstandish

@schattian any reason you are using multiprocessing instead of a thread?

@dstandish Yep. The thread could hang indefinitely, as the kube logs api stream could do, so I need to terminate the process.

However, i am not very knowledgeable on python's api for this kind of thing, so open to suggestions.

schattian avatar Jun 07 '22 07:06 schattian

@schattian any reason you are using multiprocessing instead of a thread?

@dstandish Yep. The thread could hang indefinitely, as the kube logs api stream could do, so I need to terminate the process.

However, i am not very knowledgeable on python's api for this kind of thing, so open to suggestions.

OK. I will wait for merging before releasing the new wave of providers then - I think it might need some more deliberation (We will include it in the next wave then).

potiuk avatar Jun 07 '22 11:06 potiuk

Yep. The thread could hang indefinitely, as the kube logs api stream could do, so I need to terminate the process.

However, i am not very knowledgeable on python's api for this kind of thing, so open to suggestions.

ok yeah, thing i learned: killing threads is not well-supported. interestingly, something similar is done for ECS, and threads are used there, but they don't deal with indefinite hanging problem (and therefore don't have to forcibly terminate it).

one alternative i'll just mention is we could handle this by doing our own "following" instead of relying on the stream. i.e. we do our own loop and consume the latest logs, then sleep a second and do the same. this might allow for a cleaner solution. but, multiprocessing seems ok. i'll just mention a few things.

one is I think you need to kill the logging process when on_kill is called.

the other is, if you get an error in await_container_completion, your subprocess won't be killed. so, you probably should modify so that if you get an unexpected error after process has started, that the process will be killed.

also, i would try to make note somewhere that this is due to a problem with kubernetes, and what we are doing here is a hack that should be removed once the issue has been resolved upstream (and propagated sufficiently, which of course could be a long time from now). the upstream issue is documented here https://github.com/kubernetes/kubernetes/issues/59902.

dstandish avatar Jun 13 '22 18:06 dstandish

also, i'd make "private" the two methods you've added

dstandish avatar Jun 13 '22 18:06 dstandish

separate question for you @schattian. suppose the stream hangs, and the pod is still running. if we terminate the process and restart logs consumption, will it get logs from the new location? or will it still look at the original location and therefore any new logging is unreachable and it will immediately hang?

dstandish avatar Jun 13 '22 18:06 dstandish

one alternative i'll just mention is we could handle this by doing our own "following" instead of relying on the stream. i.e. we do our own loop and consume the latest logs, then sleep a second and do the same.

you mean calling X amount of times the kubectl logs api? I think that's a worse than a follow.. Mostly because you'll have some lines printed twice (and could be expensive for some dags)

one is I think you need to kill the logging process when on_kill is called.

Sorry, i dont think i understand what's the point here

the other is, if you get an error in await_container_completion, your subprocess won't be killed. so, you probably should modify so that if you get an unexpected error after process has started, that the process will be killed.

Hmm but in that case, aren't we going to propagate a SIGTERM? (ie killing the subprocess implicitly)? I was just relying on that, but if that's not how it works / could change, i will modify that.

also, i'd make "private" the two methods you've added

👍🏻

separate question for you @schattian. suppose the stream hangs, and the pod is still running. if we terminate the process and restart logs consumption, will it get logs from the new location? or will it still look at the original location and therefore any new logging is unreachable and it will immediately hang?

That is correct. A call to kubectl logs api after the stream hanged will return the newest logs. But, there are a few problems with that:

  1. The pod could be deleted, and in that case you'll receive no logs.
  2. We determine that the stream hangs as "if the container is not running and stream consumption didnt finish", as there is not a better way to do it (please let me know if you have one). In that case

I have mentioned that in the first pr https://github.com/apache/airflow/pull/23618#discussion_r869096030 I think it adds some complexity that's not worth as this bugs doesn't occur so often

schattian avatar Jun 16 '22 08:06 schattian

one is I think you need to kill the logging process when on_kill is called.

Sorry, i dont think i understand what's the point here

in operators, if we kill the task (e.g. from UI) shouldn't we also kill the logging subprocess? if we don't do that, will it continue running e.g. on celery worker?

dstandish avatar Jun 16 '22 15:06 dstandish

in operators, if we kill the task (e.g. from UI) shouldn't we also kill the logging subprocess? if we don't do that, will it continue running e.g. on celery worker?

Ah, okay, let me check it. I will push the above mentioned changes (make new funcs private)

schattian avatar Jun 20 '22 13:06 schattian

@dstandish i've forgot, but what happens in such case (and same for whenever the pod is deleted for some other external issue that could cause, for example, pod not found) is that the kubectl stream api returns immediately, so this is not an issue (see eg cncf kube pod operator on_kill def).

I've just made the functions private

schattian avatar Jun 20 '22 20:06 schattian

hm, are calls to method wrapped by a private function mocked? I dont see any other way the tests can timeout with the new changes. They are not flaky, only fail consistently after the addition

schattian avatar Jun 21 '22 08:06 schattian

@schattian ALWAYS rebase your changes when you attempt to see if the tests are passing. This way you avoid wasting your and committers time on trying to solve problems that are Likely already solved.

You are 102 commits behind. Things ARE not going to work.

image

potiuk avatar Jun 21 '22 09:06 potiuk

@potiuk uhm i dont see how it helps. the ci code was the same as before except the new additions, and tests fail for those.

I've updated it btw, lets see

schattian avatar Jun 21 '22 09:06 schattian

@potiuk uhm i dont see how it helps. the ci code was the same as before except the new additions, and tests fail for those.

I've updated it btw, lets see

I think this is pretty obvious - if there was an issue with some tests at the moment you branched off no matter how hard you try it will stay there if you don't rebase - if it was fixed in one of the 102 commits you will get it fixed. Why do you think it would work otherwise?

potiuk avatar Jun 21 '22 11:06 potiuk

Now - it seem you do have a real issue - lilely introduced by your changes:

You need to (as usual) start breeze in your version (build image) - reproduce it by running tests in the same environment as in CI and fix.

potiuk avatar Jun 21 '22 11:06 potiuk

I think this is pretty obvious - if there was an issue with some tests at the moment you branched off no matter how hard you try it will stay there if you don't rebase - if it was fixed in one of the 102 commits you will get it fixed.

BTW. It's not guaranteed to work. It just removes one of the potential failure reasons out of the possible issues.

potiuk avatar Jun 21 '22 11:06 potiuk

yeah thats what i was thinking, added by making the functions private. What i think happens is that the methods that are looking to be mocked are inside a private function. I dont see how to change that behaviour other than making the functions non-private again.

schattian avatar Jun 21 '22 11:06 schattian

yeah thats what i was thinking, added by making the functions private. What i think happens is that the methods that are looking to be mocked are inside a private function. I dont see how to change that behaviour other than making the functions non-private again

wellp sorry yeah if they can't be private they can't be private. i just want to minimize surface area of promises we are making. when we have "public" methods we can't change them without deprecation.

re

@dstandish i've forgot, but what happens in such case (and same for whenever the pod is deleted for some other external issue that could cause, for example, pod not found) is that the kubectl stream api returns immediately, so this is not an issue (see eg cncf kube pod operator on_kill def).

can you clarify... when what happens?

dstandish avatar Jun 21 '22 17:06 dstandish

can you clarify... when what happens?

The function consuming the logs (the process) will return immediately so it'll terminate.

That's because the stream api will return.

schattian avatar Jun 21 '22 18:06 schattian