django-huey
django-huey copied to clipboard
Cannot enqueue pipeline containing tasks across queues
Given the below tasks.py
, and settings:
# tasks.py
from django_huey import task, enqueue
@task()
def count_beans(num_beans: int):
print(f'Counted {num_beans} beans.')
@task(queue='foo')
def count_foos(num_foos: int):
print(f'Counted {num_foos} foos.')
@task()
def count_stuff():
enqueue(count_beans.s(2).then(count_foos))
settings.py
pool = ConnectionPool(host=os.environ.get('REDIS_HOST', 'redis'), port=6379, max_connections=100)
huey_consumer_options = {
'workers': 2,
'worker_type': 'thread',
'initial_delay': 0.1, # Smallest polling interval, same as -d.
'backoff': 1.15, # Exponential backoff using this rate, -b.
'max_delay': 10.0, # Max possible polling interval, -m.
'scheduler_interval': 1, # Check schedule every second, -s.
'periodic': False, # Enable crontab feature.
'check_worker_health': True, # Enable worker health checks.
'health_check_interval': 1, # Check worker health every second.
}
huey_queue_options = {
'huey_class': 'huey.RedisHuey', # Huey implementation to use.
'name': 'example', # Use db name for huey.
'results': True, # Store return values of tasks.
# TODO: consider storing None and using expiring Redis, and preserving by default
'store_none': False, # If a task returns None, do not save to results.
'immediate': False,
'utc': False, # Use local time for all times internally.
'blocking': True, # Perform blocking pop rather than poll Redis.
'connection': {
'connection_pool': pool,
# huey-specific connection parameters.
'read_timeout': 1, # If not polling (blocking pop), use timeout.
},
'consumer': huey_consumer_options,
}
DJANGO_HUEY = {
'default': 'default',
'queues': {
'default': {
**huey_queue_options,
'name': 'default_tasks',
},
'foo': {
**huey_queue_options,
'name': 'foo_tasks'
},
},
}
When I attempt to run the count_stuff
task I encounter an error like this:
huey.exceptions.HueyException: tasks.count_foos not found in TaskRegistry
This is because:
-
django_huey.enqueue
callshuey.api.enqueue
- which calls
huey.api.serialize_task
- which calls
huey.registry.create_message
and create_message
checks if the task is in the registry of the current queue, and if it is not, it raises HueyException
as noted above. This makes sense given that huey
does not itself support multiple queues and django-huey
is mostly a wrapper. Given that, I'm not sure what the best approach for a fix or workaround would be - your thoughts would be much appreciated!
Here is a reproduction repo: https://github.com/radusuciu/django-huey-issue14
Hi! make sense to me, I'll be reviewing it soon and let you know. It's not a workaround, but you can use the same queue for chained tasks until the feature is added.
Seeing huey's code, I'm not sure if huey's api allows it, but I'll be reviewing it anyway
Would it be possible to specify multiple queues? Or perhaps having a special value of "*" to represent all queues?