celery-singleton icon indicating copy to clipboard operation
celery-singleton copied to clipboard

Ability to chain tasks

Open Suor opened this issue 4 years ago • 7 comments

I tried with many task dedupe libs including yours and this doesn't work:

(task1.si(42) | task2.s(1)).delay()
(task1.si(42) | task2.s(2)).delay()

I want task1(42) to be deduped and then both task2(1) and task2(2) be ran after it finished. Is this possible?

Suor avatar Apr 17 '20 03:04 Suor

Why doesn't this work? I have no idea how chaining works in celery internally, so not sure what is needed here.

I'm open to PR if you're interested in implementing this.

steinitzu avatar Apr 17 '20 14:04 steinitzu

As far as I understand, you send a message to broker to perform first task and chain info is inside. The worker handles that. But if you don't send a message then the chain is simply lost.

Suor avatar Apr 17 '20 15:04 Suor

So we would need some way to add tasks to this existing chain? Is that even doable without being riddled with race conditions?

steinitzu avatar Apr 17 '20 18:04 steinitzu

I don't think you can edit chains and other callbacks of existing messages generally and if message was read by the worker then it's probably too late anyway.

We can however save chain info to Redis when read it when task is done and send a new message

Suor avatar Apr 18 '20 01:04 Suor

I'm checking this bug and seem to be a problem with delivery the task task1.si(42) (only one time and then "disappear"), so the second task never is ready... for example:

import time
from celery_singleton import Singleton
from somewhere import celery_app

@celery_app.task(base=Singleton)
def do_stuff(*args, **kwargs):
	time.sleep(4)
	return 'I just woke up'

# run 3 times
async_result = (do_stuff.si(1) |  do_stuff.s(2)).delay()
async_result2 = (do_stuff.si(1) |  do_stuff.s(3)).delay()

# sleep  3 times 
time.sleep(12)

assert async_result.ready() # ok
assert async_result2.ready() # fail

'I just woke up' is only printed 2 times... must be 3...

lsaavedr avatar Aug 10 '20 05:08 lsaavedr

I have similar problem even without chaining - if I run the task.get() method in two simultaneous Flask requests, only one request gets finished. The other is waiting infinitly.

PetrDlouhy avatar Sep 22 '20 09:09 PetrDlouhy

I think, that it might be enough to send the right message from Singleton.on_duplicate, which could even get the chain property to be passed from apply_async, but I gave up, because I was not able to find the right message to be sent.

PetrDlouhy avatar Nov 16 '20 14:11 PetrDlouhy