celery-pool-asyncio
celery-pool-asyncio copied to clipboard
Feature: support celery5 and python38
This is NOT ready to be merged.
The eventual goal of this PR is to address #17, #23 and #29. At this time, the bare minimum changes have been applied to be able to launch tox, and have it hang like so:
$ tox -r
GLOB sdist-make: /main/srhaque/kdedev/celery-pool-asyncio/setup.py
py38 recreate: /main/srhaque/kdedev/celery-pool-asyncio/.tox/py38
py38 installdeps: -r/main/srhaque/kdedev/celery-pool-asyncio/requirements_dev.txt
py38 inst: /main/srhaque/kdedev/celery-pool-asyncio/.tox/.tmp/package/1/celery-pool-asyncio-0.2.0.zip
py38 installed: alabaster==0.7.12,...,zipp==3.4.1
py38 run-test-pre: PYTHONHASHSEED='3791355490'
py38 run-test: commands[0] | pip install -U pip
Requirement already satisfied: pip in ./.tox/py38/lib/python3.8/site-packages (20.3.3)
Collecting pip
Using cached pip-21.0.1-py3-none-any.whl (1.5 MB)
Installing collected packages: pip
Attempting uninstall: pip
Found existing installation: pip 20.3.3
Uninstalling pip-20.3.3:
Successfully uninstalled pip-20.3.3
Successfully installed pip-21.0.1
py38 run-test: commands[1] | py.test --basetemp=/main/srhaque/kdedev/celery-pool-asyncio/.tox/py38/tmp
================================================================================ test session starts =================================================================================
platform linux -- Python 3.8.5, pytest-6.2.3, py-1.10.0, pluggy-0.13.1 -- /main/srhaque/kdedev/celery-pool-asyncio/.tox/py38/bin/python
cachedir: .tox/py38/.pytest_cache
rootdir: /main/srhaque/kdedev/celery-pool-asyncio, configfile: setup.cfg
plugins: cov-2.11.1
collected 1 item
tests/test_celery_pool_asyncio.py::test_create_task ^CERROR: got KeyboardInterrupt signal
The proximate reason for the hang is that https://github.com/celery/celery/blob/4f2213a427861cf42b778ef499f29b179d8c40ed/celery/contrib/testing/worker.py#L46 says:
Warning:
Worker must be started within a thread for this to work,
or it will block forever.
and though the test does do this, somehow the worker thread never starts.
Some small progress. Now, the test fails as follows:
$ tox
...
py38 run-test: commands[1] | py.test --basetemp=/main/srhaque/kdedev/celery-pool-asyncio/.tox/py38/tmp
========================================================== test session starts ===========================================================
platform linux -- Python 3.8.10, pytest-6.2.4, py-1.10.0, pluggy-0.13.1 -- /main/srhaque/kdedev/celery-pool-asyncio/.tox/py38/bin/python
cachedir: .tox/py38/.pytest_cache
rootdir: /main/srhaque/kdedev/celery-pool-asyncio, configfile: setup.cfg
plugins: celery-0.0.0, cov-2.12.1
collected 1 item
tests/test_celery_pool_asyncio.py::test_create_task FAILED
================================================================ FAILURES ================================================================
____________________________________________________________ test_create_task ____________________________________________________________
def test_create_task():
from celery.contrib.testing.app import TestApp
app = TestApp(config={
'broker_url': 'filesystem:// %s' % dir_messages,
'broker_transport_options': {
'data_folder_in': '%s' % dir_out,
'data_folder_out': '%s' % dir_out,
'data_folder_processed': '%s' % dir_processed,
},
'result_persistent': True,
'worker_pool': 'celery_pool_asyncio:TaskPool',
})
wrapped = app.task(tack_function)
app.register_task(wrapped)
msg = 'hello, world!'
loop = asyncio.get_event_loop()
if False:
#
# This works.
#
task = wrapped(msg)
reply = loop.run_until_complete(task)
else:
#
# This times out.
#
task = wrapped.delay(msg)
reply = loop.run_until_complete(task)
> reply = reply.get(timeout=10)
tests/test_celery_pool_asyncio.py:62:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py38/lib/python3.8/site-packages/celery/result.py:223: in get
return self.backend.wait_for_pending(
.tox/py38/lib/python3.8/site-packages/celery/backends/base.py:703: in wait_for_pending
meta = self.wait_for(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <celery.backends.cache.CacheBackend object at 0x7f0c217eff40>, task_id = '1dc86723-d462-4306-aea0-ab78cd9082e0', timeout = 10
interval = 0.5, no_ack = True, on_interval = <promise@0x7f0c22073b80>
def wait_for(self, task_id,
timeout=None, interval=0.5, no_ack=True, on_interval=None):
"""Wait for task and return its result.
If the task raises an exception, this exception
will be re-raised by :func:`wait_for`.
Raises:
celery.exceptions.TimeoutError:
If `timeout` is not :const:`None`, and the operation
takes longer than `timeout` seconds.
"""
self._ensure_not_eager()
time_elapsed = 0.0
while 1:
meta = self.get_task_meta(task_id)
if meta['status'] in states.READY_STATES:
return meta
if on_interval:
on_interval()
# avoid hammering the CPU checking status.
time.sleep(interval)
time_elapsed += interval
if timeout and time_elapsed >= timeout:
> raise TimeoutError('The operation timed out.')
E celery.exceptions.TimeoutError: The operation timed out.
.tox/py38/lib/python3.8/site-packages/celery/backends/base.py:739: TimeoutError
As per the commit comment, the key issue is between the working and non-working paths in the test is the interaction between the call to .delay() and loop.run_until_complete(task). I've not yet been able to understand how .delay() or .apply_async() actually work, not least in the test environment without an actual broker: that's probably the key to this.
@ShaheedHaque @kai3341 I figured out what the issue with the test(s) is. I've got a fork of @ShaheedHaque's work and have been working off of that.
The short version is that the issue is coming from the way that the test suite is (not) using the celery_session_app and celery_session_worker fixtures respectively. At the time of this writing (~7:40 PM EST 10/23/2022), the I've got it all working such that the test suite does appear to at least be calling the dummy task_function in response to task.delay(message) being called. However it currently hangs there indefinitely (as opposed to raising a timeout error like it previously was).
I'm 90% sure it's hanging because of the way that the Celery fixtures use threads along with the fact that I've added breakpoints to various functions to help be debug the issue. The short version is that I don't think that I'm to far off from having everything working properly, but I'll keep this thread updated and open a PR when I've got it all 100%.
Great progress, sounds promising! Let me know if I can help.
@ShaheedHaque Hoo boy. Ok, so, I think I've got a handle on what the actual issue is but I'm... very not sure what to do about it. Any chance you have Discord or anything more real-time than this thread? I'd love to go over it with you, and I'd bet we could get it knocked out in an afternoon.
I've not looked at the code in some months, and will be rather stale as a result. That said, I'd be happy to do (say) a Skype session so we can share screens etc. I'm in London, and hence UK timezone. Send me an email at <username>@gmail.com and we can arrange the details.
@ShaheedHaque I sent off an email this morning, should show up as the@<username>.dev. Looking forward to getting this working 😁