celery-pool-asyncio icon indicating copy to clipboard operation
celery-pool-asyncio copied to clipboard

Feature: support celery5 and python38

Open ShaheedHaque opened this issue 4 years ago • 6 comments

This is NOT ready to be merged.

The eventual goal of this PR is to address #17, #23 and #29. At this time, the bare minimum changes have been applied to be able to launch tox, and have it hang like so:

$ tox -r
GLOB sdist-make: /main/srhaque/kdedev/celery-pool-asyncio/setup.py
py38 recreate: /main/srhaque/kdedev/celery-pool-asyncio/.tox/py38
py38 installdeps: -r/main/srhaque/kdedev/celery-pool-asyncio/requirements_dev.txt
py38 inst: /main/srhaque/kdedev/celery-pool-asyncio/.tox/.tmp/package/1/celery-pool-asyncio-0.2.0.zip
py38 installed: alabaster==0.7.12,...,zipp==3.4.1
py38 run-test-pre: PYTHONHASHSEED='3791355490'
py38 run-test: commands[0] | pip install -U pip
Requirement already satisfied: pip in ./.tox/py38/lib/python3.8/site-packages (20.3.3)
Collecting pip
  Using cached pip-21.0.1-py3-none-any.whl (1.5 MB)
Installing collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 20.3.3
    Uninstalling pip-20.3.3:
      Successfully uninstalled pip-20.3.3
Successfully installed pip-21.0.1
py38 run-test: commands[1] | py.test --basetemp=/main/srhaque/kdedev/celery-pool-asyncio/.tox/py38/tmp
================================================================================ test session starts =================================================================================
platform linux -- Python 3.8.5, pytest-6.2.3, py-1.10.0, pluggy-0.13.1 -- /main/srhaque/kdedev/celery-pool-asyncio/.tox/py38/bin/python
cachedir: .tox/py38/.pytest_cache
rootdir: /main/srhaque/kdedev/celery-pool-asyncio, configfile: setup.cfg
plugins: cov-2.11.1
collected 1 item                                                                                                                                                                     

tests/test_celery_pool_asyncio.py::test_create_task ^CERROR: got KeyboardInterrupt signal

The proximate reason for the hang is that https://github.com/celery/celery/blob/4f2213a427861cf42b778ef499f29b179d8c40ed/celery/contrib/testing/worker.py#L46 says:

        Warning:
            Worker must be started within a thread for this to work,
            or it will block forever.

and though the test does do this, somehow the worker thread never starts.

ShaheedHaque avatar Apr 10 '21 13:04 ShaheedHaque

Some small progress. Now, the test fails as follows:

$ tox
...
py38 run-test: commands[1] | py.test --basetemp=/main/srhaque/kdedev/celery-pool-asyncio/.tox/py38/tmp
========================================================== test session starts ===========================================================
platform linux -- Python 3.8.10, pytest-6.2.4, py-1.10.0, pluggy-0.13.1 -- /main/srhaque/kdedev/celery-pool-asyncio/.tox/py38/bin/python
cachedir: .tox/py38/.pytest_cache
rootdir: /main/srhaque/kdedev/celery-pool-asyncio, configfile: setup.cfg
plugins: celery-0.0.0, cov-2.12.1
collected 1 item                                                                                                                         

tests/test_celery_pool_asyncio.py::test_create_task FAILED

================================================================ FAILURES ================================================================
____________________________________________________________ test_create_task ____________________________________________________________

    def test_create_task():
        from celery.contrib.testing.app import TestApp
        app = TestApp(config={
            'broker_url': 'filesystem:// %s' % dir_messages,
            'broker_transport_options': {
                'data_folder_in': '%s' % dir_out,
                'data_folder_out': '%s' % dir_out,
                'data_folder_processed': '%s' % dir_processed,
            },
            'result_persistent': True,
            'worker_pool': 'celery_pool_asyncio:TaskPool',
        })
        wrapped = app.task(tack_function)
        app.register_task(wrapped)
        msg = 'hello, world!'
        loop = asyncio.get_event_loop()
        if False:
            #
            # This works.
            #
            task = wrapped(msg)
            reply = loop.run_until_complete(task)
        else:
            #
            # This times out.
            #
            task = wrapped.delay(msg)
            reply = loop.run_until_complete(task)
>           reply = reply.get(timeout=10)

tests/test_celery_pool_asyncio.py:62: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.tox/py38/lib/python3.8/site-packages/celery/result.py:223: in get
    return self.backend.wait_for_pending(
.tox/py38/lib/python3.8/site-packages/celery/backends/base.py:703: in wait_for_pending
    meta = self.wait_for(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <celery.backends.cache.CacheBackend object at 0x7f0c217eff40>, task_id = '1dc86723-d462-4306-aea0-ab78cd9082e0', timeout = 10
interval = 0.5, no_ack = True, on_interval = <promise@0x7f0c22073b80>

    def wait_for(self, task_id,
                 timeout=None, interval=0.5, no_ack=True, on_interval=None):
        """Wait for task and return its result.
    
        If the task raises an exception, this exception
        will be re-raised by :func:`wait_for`.
    
        Raises:
            celery.exceptions.TimeoutError:
                If `timeout` is not :const:`None`, and the operation
                takes longer than `timeout` seconds.
        """
        self._ensure_not_eager()
    
        time_elapsed = 0.0
    
        while 1:
            meta = self.get_task_meta(task_id)
            if meta['status'] in states.READY_STATES:
                return meta
            if on_interval:
                on_interval()
            # avoid hammering the CPU checking status.
            time.sleep(interval)
            time_elapsed += interval
            if timeout and time_elapsed >= timeout:
>               raise TimeoutError('The operation timed out.')
E               celery.exceptions.TimeoutError: The operation timed out.

.tox/py38/lib/python3.8/site-packages/celery/backends/base.py:739: TimeoutError

As per the commit comment, the key issue is between the working and non-working paths in the test is the interaction between the call to .delay() and loop.run_until_complete(task). I've not yet been able to understand how .delay() or .apply_async() actually work, not least in the test environment without an actual broker: that's probably the key to this.

ShaheedHaque avatar Aug 30 '21 10:08 ShaheedHaque

@ShaheedHaque @kai3341 I figured out what the issue with the test(s) is. I've got a fork of @ShaheedHaque's work and have been working off of that.

The short version is that the issue is coming from the way that the test suite is (not) using the celery_session_app and celery_session_worker fixtures respectively. At the time of this writing (~7:40 PM EST 10/23/2022), the I've got it all working such that the test suite does appear to at least be calling the dummy task_function in response to task.delay(message) being called. However it currently hangs there indefinitely (as opposed to raising a timeout error like it previously was).

I'm 90% sure it's hanging because of the way that the Celery fixtures use threads along with the fact that I've added breakpoints to various functions to help be debug the issue. The short version is that I don't think that I'm to far off from having everything working properly, but I'll keep this thread updated and open a PR when I've got it all 100%.

the-wondersmith avatar Oct 23 '22 23:10 the-wondersmith

Great progress, sounds promising! Let me know if I can help.

ShaheedHaque avatar Oct 24 '22 00:10 ShaheedHaque

@ShaheedHaque Hoo boy. Ok, so, I think I've got a handle on what the actual issue is but I'm... very not sure what to do about it. Any chance you have Discord or anything more real-time than this thread? I'd love to go over it with you, and I'd bet we could get it knocked out in an afternoon.

the-wondersmith avatar Oct 24 '22 13:10 the-wondersmith

I've not looked at the code in some months, and will be rather stale as a result. That said, I'd be happy to do (say) a Skype session so we can share screens etc. I'm in London, and hence UK timezone. Send me an email at <username>@gmail.com and we can arrange the details.

ShaheedHaque avatar Oct 24 '22 18:10 ShaheedHaque

@ShaheedHaque I sent off an email this morning, should show up as the@<username>.dev. Looking forward to getting this working 😁

the-wondersmith avatar Oct 25 '22 20:10 the-wondersmith