vt-py icon indicating copy to clipboard operation
vt-py copied to clipboard

RuntimeError: Event loop is closed in multiprocessing enviroments (celery).

Open artur-augustyniak opened this issue 4 years ago • 7 comments

Hi,

First of all sorry for not providing PR, lack of spare time. The problem I faced is not strictly an vt-py issue but i beleave that protection against such a problem can also be added here. In short words, i ve got software where celery tasks using vt-py are processesd among other tasks, other tasks are using packages containg async code too. While trying to debug error mentioned in title I came up with PoC of problem/possible solution. I believe that code will be best description.

Right now the only more or less clear solution is wrapping async code using threads.

#!/usr/bin/env python3


import os
from multiprocessing import Pool
import asyncio


def other_celery_task(job_id):
    '''
        Here we've got common pattern in other libs, notice 'idiomatic' (or pretend to be) view on event loop as your own resource:
        obtain new event_loop
        try:
            do your job
        finally:
            event_loop.close()
    '''
    cpid = os.getpid()
    event_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(event_loop)

    async def some_aync_op_possibly_throwing_exception():
        print(
            "[*] async def some_aync_op_possibly_throwing_exception() start id", job_id, cpid)
        # await asyncio.sleep()
        print(
            "[*] async def some_aync_op_possibly_throwing_exception() done id", job_id, cpid)
        return 1
    try:
        return event_loop.run_until_complete(some_aync_op_possibly_throwing_exception())
    except Exception:
        print("Oh no, fail, let's ignore it, forget about finnaly below")
    finally:
        event_loop.close()


def vt_client_related_celery_task(job_id):
    '''
        Here we've got reconstructed flow from vt.Client
    '''
    cpid = os.getpid()

    async def vt_client_aync_op():
        print("[*] async def vt_cllient_aync_op() start id", job_id, cpid)
        # await asyncio.sleep()
        print("[*] async def vt_cllient_aync_op() done id", job_id, cpid)
        return 1

    try:
        event_loop = asyncio.get_event_loop()
        print("event loop was in place id",
              job_id, cpid, event_loop.is_closed())

        '''
            try to uncommnet 2 lines below. I assume that closed loop is not NX loop, so Runtime exceptiion will be never thrown.
            When next celery task with vt.Client arrives, we've got RuntimeError: Event loop is closed
        '''
        # if event_loop.is_closed():
        #     raise RuntimeError("other task closed our loop?")
    except RuntimeError:
        # Generate an event loop if there isn't any.
        event_loop = asyncio.new_event_loop()
        asyncio.set_event_loop(event_loop)
        print("event loop regenerated id", job_id, cpid)

    return event_loop.run_until_complete(vt_client_aync_op())


if __name__ == '__main__':
    '''
        Here we've got ovesimplication of default celery worker setup, dealing also with other tasks.
        Pool size 2 is intentionally small, this problem can be non deterministic

    '''

    with Pool(2) as pool:

        vt_round_one = pool.map_async(vt_client_related_celery_task,
                                      [i for i in range(0, 9)])
        other_round = pool.map_async(other_celery_task, [9])
        vt_round_two = pool.map_async(vt_client_related_celery_task,
                                      [i for i in range(10, 20)])
        if 20 == sum(vt_round_one.get() + other_round.get() + vt_round_two.get()):
            print("all tasks executed")
        else:
            print("yay, fail")

BR Artur Augustyniak

artur-augustyniak avatar Apr 29 '21 17:04 artur-augustyniak

I don't quite understand the issue. Is that the event loop used by vt-py is being closed at some other place?

plusvic avatar Apr 30 '22 09:04 plusvic

Yes, other code packages often interfere with the event loop by closing it, it is not a problem of the vt-py itself. However, it seems to me that it could defend itself against closing the event loop by maintaining its own, e.g. in a dedicated thread for that.

artur-augustyniak avatar May 02 '22 06:05 artur-augustyniak

I'm not sure about that. When you are using vt-py in async mode (i.e: using exclusively async functions from the library) the event loop is something external to the library, it should be the user who decides whether to create a new event loop or not. As a user of the library you may want to use it in a single-threaded environment where a single event loop is shared by multiple concurrent tasks.

plusvic avatar May 02 '22 08:05 plusvic

Sure, you're right, of course. I am not reporting the error in vt-py. It's a suggestion based on my experience. Unfortunately many libraries implement such protective measures, because in environments like celery you have no chance as a user to control when the event loop for a given process (worker process) is closed. As far as I remember I wrote a suggestion that vt-py may give "manage your own event loop" option, because it was something I had to do myself anyway. So apart from saying "hey why not consider this", the issue is destined to be closed :)

artur-augustyniak avatar May 04 '22 06:05 artur-augustyniak

I'm ok with adding protective measures if that helps, but I don't know what type of protective measures we can add. Do you have some specific measure in mind?

plusvic avatar May 04 '22 07:05 plusvic

By the end of the week I'll dig out my patches, make them civilized, and do a pull request. From what I remember, I decided to detect the state of the event loop or send the work to a dedicated thread so that the thread can manage its event loop.

I don't remember the details at the moment, the topic is a bit old but since my workplace didn't explode I guess it works ok ;)

artur-augustyniak avatar May 04 '22 07:05 artur-augustyniak

Ok, now I remember. The whole point is that when the event loop is only closed but exists then get_event_loop does not throw a runtime exception. In an environment like celery, where you have several processes and you don't control which one your code runs in, and on top of that the code of other libraries closes the event loop (because that's the python idiom - treat the loop as your own resource) you're not able to meaningfully guarantee as a user that the loop will be in a good state.

Run the following test with vt-py master and then with my fork:


import vt
import os
from multiprocessing import Pool
import asyncio


def other_celery_task(job_id):
    '''
        Here we've got common pattern in other libs, notice 'idiomatic' (or pretend to be) view on event loop as your own resource:
        obtain new event_loop
        try:
            do your job
        finally:
            event_loop.close()
    '''
    cpid = os.getpid()
    event_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(event_loop)

    async def some_aync_op_possibly_throwing_exception():
        print(
            "[*] async def some_aync_op_possibly_throwing_exception() start id", job_id, cpid)
        # await asyncio.sleep()
        print(
            "[*] async def some_aync_op_possibly_throwing_exception() done id", job_id, cpid)
        return 1
    try:
        return event_loop.run_until_complete(some_aync_op_possibly_throwing_exception())
    except Exception:
        print("Oh no, fail, let's ignore it, forget about finnaly below")
    finally:
        event_loop.close()


def vt_client_related_celery_task(job_id):
    api_key = "API_KEY"
    with vt.Client(api_key) as client:
        vt_report = client.get_object("/files/47bb7f855cdf116c62499240089fa1b7a69585e8b7f639e192b9d038da4094cd")
        print(vt_report, "asd")
    return 1


if __name__ == '__main__':
    '''
        Here we've got ovesimplication of default celery worker setup, dealing also with other tasks.
        Pool size 2 is intentionally small, this problem can be non deterministic

    '''

    with Pool(2) as pool:

        vt_round_one = pool.map_async(vt_client_related_celery_task,
                                      [i for i in range(0, 9)])
        other_round = pool.map_async(other_celery_task, [9])
        
        vt_round_two = pool.map_async(vt_client_related_celery_task,
                                      [i for i in range(10, 20)])
        if 20 == sum(vt_round_one.get() + other_round.get() + vt_round_two.get()):
            print("all tasks executed")
        else:
            print("yay, fail")`
```

artur-augustyniak avatar May 06 '22 10:05 artur-augustyniak