celery icon indicating copy to clipboard operation
celery copied to clipboard

Nested canvas - fetching result hangs

Open greuff opened this issue 6 years ago • 11 comments

Hello, I'm not quite sure if this is a bug or if I'm missing something essential. We're trying to drive a non-trivial canvas and experience lockups when fetching the result now and then, and I'm trying to hunt down the cause. We're using Celery 4.2.1, and current Redis (from the redis Docker image) as both backend and broker.

I stripped a whole lot of code and came up with this little test script. The tasks are stripped down to do nothing, they just return the input given to them (all of the tasks return results).

    tileinfos = [{'technology': 'x', 'tilespec': 'y'},
             {'technology': 'x', 'tilespec': 'y'},
             {'technology': 'x', 'tilespec': 'y'},
             {'technology': 'x', 'tilespec': 'y'},
             {'technology': 'x', 'tilespec': 'y'},
             {'technology': 'x', 'tilespec': 'y'}]

    res = group(
        chain(
            tasks.generateTile.s(tileinfo),
            group(
                [
                    chain(tasks.generatePCL.s('signal'), tasks.entwine.s('signal') ),
                    chain(tasks.generatePCL.s('sinr'), tasks.entwine.s('sinr') ),
                    chain(tasks.generatePCL.s('safezone'), tasks.entwine.s('safezone') ),
                    chain(tasks.generatePCL.s('cellcolor'), tasks.entwine.s('cellcolor') )
                ]
            ),
            tasks.merge.s()
        ) for tileinfo in tileinfos ).apply_async()

Now, when I fetch the result in the next instruction, the script hangs, although all celery tasks completed successfully, as confirmed with flower, celery event monitor, etc.:

print(res.get())    # This hangs forever

The curious part now is, that when I do the following instead, the result is fetched and the script succeeds:

    while True:
        try:
            print("Successful: {}".format(res.successful()))
            print("Failed: {}".format(res.failed()))
            print("Waiting: {}".format(res.waiting()))
            print("Ready: {}".format(res.ready()))
            print("Completed count: {}".format(res.completed_count()))

            pipelineResult = res.get(timeout=1)
            print("Pipeline finished!")
            print(pipelineResult)
            break
        except TimeoutError:
            print("I am still alive and waiting for pipeline results...")

Note that sometimes more than one loop revolution is required until res.get eventually returns the result, even though all tasks are already completed! As if it was driving some kind of state machine and lags behind.

Now, even funnier, when I comment out the print statements that call waiting() etc on the GroupResult object, the script again loops forever and never finishes:

while True:
    try:
        # print("Successful: {}".format(res.successful()))
        # print("Failed: {}".format(res.failed()))
        # print("Waiting: {}".format(res.waiting()))
        # print("Ready: {}".format(res.ready()))
        # print("Completed count: {}".format(res.completed_count()))

        pipelineResult = res.get(timeout=1)
        print("Pipeline finished!")
        print(pipelineResult)
        break
    except TimeoutError:
        print("I am still alive and waiting for pipeline results...")

Output:

I am still alive and waiting for pipeline results...
I am still alive and waiting for pipeline results...
I am still alive and waiting for pipeline results...
I am still alive and waiting for pipeline results...
....

I looked at the redis-cli monitor command, and there is a lot of subscribing/unsubscribing going on. I put the working version of this script in a loop and let it run 1000 times in a row - it completed successfully every time. I also confirmed with redis-cli pubsub channels that the number of channels stays stable (at around 150).

Also, when the tileinfos array only contains one item, the script doesn't hang at all.

Now, I'm not sure if I did anything wrong, especially in constructing the canvas, or if there is a bug. I understand that we do a lot of nesting chains into groups into chains into groups, and I'm not sure if that's supported at all. I don't however understand why res.get() blocks forever, or why it seems to be important to call res.waiting etc. before calling res.get.

greuff avatar Sep 19 '18 14:09 greuff

Additional info: the observed hang of res.get() only occurs when the tileinfos array (and therefore the outer group) has more than 4 elements. Up to 4 elements the call doesn't lock up!

greuff avatar Sep 19 '18 15:09 greuff

It seems to have to do with the total number of tasks in the canvas. I simplified the canvas to:

res = group(
        tasks.generateTile.s(tileinfo) for tileinfo in tileinfos
    ).apply_async()

and I observe the same hang when I have more than 28 tasks in the group (i.e., more than 28 tileinfo elements in the tileinfos array). Up to and including 28 tasks the script works fine! Workaround is again to fetch the result in the loop.

For the sake of completeness, this is the definition of the task:

@app.task(name = 'tasks.generateTile', bind=True, max_retries=3, acks_late=True, track_started=True)
def generateTile(self, tileinfo):
    if 'error' in tileinfo:
        return tileinfo
    try:
        logger.info('>>> generateTile {}'.format(tileinfo['tilespec']))

        tilespec = tileinfo['tilespec']

        # Complete success!
        logger.info('<<< generateTile {}'.format(tileinfo['tilespec']))
        return tileinfo
    except subprocess.CalledProcessError as e:
        logger.error(e.output)
        try:
            self.retry(countdown=2)
        except MaxRetriesExceededError:
            return { 'error': 'exception' }
    except Exception as e:
        logger.error(e)
        try:
            self.retry(countdown=2)
        except MaxRetriesExceededError:
            return { 'error': 'exception' }

and this is how I set up the app:

app = Celery('pipeline',
             broker=os.environ['CELERY_BROKER_URL'],
             backend=os.environ['CELERY_BACKEND_URL'],
             include=['pipeline.tasks'])

greuff avatar Sep 19 '18 15:09 greuff

I think I'm seeing the same issue.

I have a toy problem where I'm trying to generate a bunch of random numbers. I have a trivial task that just returns a single random number. My aim is to use celery.group to call this task multiple times, then get the list of results.

My task is:

@app.task
def random_weight():
    return str(random.random())

The script I'm using to generate the numbers is:

import celery

from weighter.tasks import random_weight

def main():
    n = 11
    weights = celery.group(random_weight.s() for _ in range(n))
    print("The weights are: {}".format(weights.apply_async().get()))

if __name__ == "__main__":
    main()

Running the task with n <= 11 works correctly, but n > 11 hangs.

The logs show that all the tasks are succeeding (even for n > 11). It just seems like .get() isn't being informed that everything is done.

I am using RabbitMQ as my broker and Redis as my result backend. This problem does not occur when using RPC as the backend.

I have not tried @greuff's workaround yet. Thanks for the detailed description, though!

Versions:

Celery: 4.2.0
Redis: 5.0.3
RabbitMQ: 3.7.13

Helumpago avatar Mar 12 '19 19:03 Helumpago

I tried @greuff's workaround (periodically re-trying .get()), and I'm getting identical results. As long as I call res.successful(), res.failed(), res.waiting(), res.ready(), or res.completed_count(), I get a valid result.

Additionally, it looks like the while can be removed:

def main():
    n = 100
    weights = celery.group(random_weight.s() for _ in range(n))
    res = weights.apply_async()

    print("Successful: {}".format(res.successful()))
    pipelineResult = res.get(timeout=600000)
    print("The weights are: {}".format(pipelineResult))

~~Notice the massive timeout argument. If I remove that, everything hangs. If it is set, I get a response in a fraction of a second.~~

Also, if I drop the res.successful() call, everything hangs, even with the timeout.


Edit:

Notice the massive timeout argument. If I remove that, everything hangs. If it is set, I get a response in a fraction of a second.

Never mind about this. I was just getting lucky with the race condition. The timeout argument does not appear to have much of an affect on the chances of things hanging. Calling res.successful() definitely does have an affect on the chances of success.

Because of the race condition, I'd highly recommend keeping the full while loop approach.

Helumpago avatar Mar 13 '19 12:03 Helumpago

I just switched to using memcached as my results backend. I haven't seen it hang yet.

Helumpago avatar Mar 13 '19 12:03 Helumpago

@Helumpago thanks for chiming in. We don't have a real solution yet, but the workaround with the loop seems to be stable. We haven't had the chance to try another backend yet.

greuff avatar Mar 18 '19 15:03 greuff

I'm meeting the same issue. res.ready() hangs

yueyongyue avatar Jul 17 '19 13:07 yueyongyue

The same, this hangs:

>>> from app import add
>>> result = add.apply_async((2, 2), link=add.s(8))
>>> result.get()
4
>>> result.children
[<AsyncResult: 328b14d3-cef2-4748-a52d-26dd67efd91e>]
>>> result.children[0].get()  # hangs

Repro:

from celery import Celery

app = Celery(
    'tasks',
    broker='pyamqp://guest:guest@rabbitmq:5672',
    backend='rpc://',
)


@app.task
def add(first: int, second: int) -> int:
    print(first + second)
    return first + second

Setup:

version: '3.7'
services:
  rabbitmq:
    image: 'rabbitmq:3.8-management-alpine'
    restart: unless-stopped

  worker:
    build: .
    command: celery --app=app:app worker
    restart: unless-stopped

And dockerfile:

FROM python:3.8.6-slim-buster

LABEL maintainer="[email protected]"
LABEL vendor="wemake.services"

WORKDIR /code

RUN pip install 'celery==5.0.2'

# Copy source files:
COPY app.py /code/

sobolevn avatar Dec 13 '20 11:12 sobolevn

Brief reading makes me think that the original issue might have been related to the inner group being upgraded to a chord and then perhaps an issue with chord size counting which we fixed in #6354 . But seeing it not stall for smaller sizes of the iterable driving the comprehension is a bit suspicious. I'd have to try some of these MRTCs on top of master to get an idea.

I'm also a bit blown away by @sobolevn 's comment - the only thing I could think of there is the promise confusion we fixed in #6411 but that landed in 5.0.1 so for behaviour like that to present in such a trivial task is surprising. I'd be pretty confident in saying they're probably caused by different things, .ready() or .get() hanging doesn't really tell us too much unfortunately. I'll see if I can get some ideas and either split out some of these reports to more specific issues or add a deeper dive here if they turn out to be related.

Edit: Weird, it looks like maybe linked tasks don't actually save their result to the RPC backend. With a redis broker/backend this works fine:

import celery

app = celery.Celery(name="foo", broker="redis://", backend="redis://")

@app.task
def add(a, b):
    return a + b
>>> import foo
>>> s = foo.add.s(2, 2)
>>> s.link(foo.add.s(8))
foo.add(8)
>>> r = s.apply_async()
>>> r.get()
4
>>> r.children[0].get()
12
>>> r.children[0].id
'31111d85-0626-49ee-9574-f3aa978c0f06'
[2020-12-14 11:15:30,200: INFO/MainProcess] Received task: foo.add[e6b4e680-38f2-460b-918f-cdff6533d2eb]
[2020-12-14 11:15:30,200: DEBUG/MainProcess] TaskPool: Apply <function _trace_task_ret at 0x7f4d21b495e0> (args:('foo.add', 'e6b4e680-38f2-460b-918f-cdff6533d2eb', {'lang': 'py', 'task': 'foo.add', 'id': 'e6b4e680-38f2-460b-918f-cdff6533d2eb', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'e6b4e680-38f2-460b-918f-cdff6533d2eb', 'parent_id': None, 'argsrepr': '(2, 2)', 'kwargsrepr': '{}', 'origin': 'gen33309@host', 'reply_to': '30a087dd-981b-3e27-ac04-a67d8a219165', 'correlation_id': 'e6b4e680-38f2-460b-918f-cdff6533d2eb', 'hostname': 'celery@host', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 2], 'kwargs': {}}, b'[[2, 2], {}, {"callbacks": [{"task": "foo.add", "args": [8], "kwargs": {}, "options": {}, "subtask_type": null, "immutable": false, "chord_size": null}], "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') kwargs:{})
[2020-12-14 11:15:30,201: DEBUG/MainProcess] Task accepted: foo.add[e6b4e680-38f2-460b-918f-cdff6533d2eb] pid:33307
[2020-12-14 11:15:30,223: INFO/MainProcess] Received task: foo.add[31111d85-0626-49ee-9574-f3aa978c0f06]
[2020-12-14 11:15:30,223: DEBUG/MainProcess] TaskPool: Apply <function _trace_task_ret at 0x7f4d21b495e0> (args:('foo.add', '31111d85-0626-49ee-9574-f3aa978c0f06', {'lang': 'py', 'task': 'foo.add', 'id': '31111d85-0626-49ee-9574-f3aa978c0f06', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'e6b4e680-38f2-460b-918f-cdff6533d2eb', 'parent_id': 'e6b4e680-38f2-460b-918f-cdff6533d2eb', 'argsrepr': '(4, 8)', 'kwargsrepr': '{}', 'origin': 'gen33307@host', 'reply_to': 'e9ede7ae-7150-3af2-a641-915176135280', 'correlation_id': '31111d85-0626-49ee-9574-f3aa978c0f06', 'hostname': 'celery@host', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [4, 8], 'kwargs': {}}, b'[[4, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') kwargs:{})
[2020-12-14 11:15:30,224: DEBUG/MainProcess] Task accepted: foo.add[31111d85-0626-49ee-9574-f3aa978c0f06] pid:33300
[2020-12-14 11:15:30,224: INFO/ForkPoolWorker-8] Task foo.add[e6b4e680-38f2-460b-918f-cdff6533d2eb] succeeded in 0.022559431999980006s: 4
[2020-12-14 11:15:30,242: INFO/ForkPoolWorker-1] Task foo.add[31111d85-0626-49ee-9574-f3aa978c0f06] succeeded in 0.01803768400077388s: 12

But the same thing with the RPC backend stalls on the child result object as @sobolevn describes

maybe-sybr avatar Dec 14 '20 00:12 maybe-sybr

I've not had any time to dig further into this. From memory and reading my previous comment, it does seem like this is still currently broken and should be looked into further. I'm going to unassign myself since I certainly won't have the time to do so over the next couple of months. Pinging @thedrow and @auvipy for redistribution if possible.

maybe-sybr avatar Feb 20 '21 21:02 maybe-sybr

no worries. we will revisit this after 5.1 release

auvipy avatar Feb 21 '21 07:02 auvipy