django-huey icon indicating copy to clipboard operation
django-huey copied to clipboard

Cannot enqueue pipeline containing tasks across queues

Open radusuciu opened this issue 1 year ago • 3 comments

Given the below tasks.py, and settings:

# tasks.py
from django_huey import task, enqueue

@task()
def count_beans(num_beans: int):
    print(f'Counted {num_beans} beans.')

@task(queue='foo')
def count_foos(num_foos: int):
    print(f'Counted {num_foos} foos.')

@task()
def count_stuff():
    enqueue(count_beans.s(2).then(count_foos))
settings.py
pool = ConnectionPool(host=os.environ.get('REDIS_HOST', 'redis'), port=6379, max_connections=100)

huey_consumer_options = {
    'workers': 2,
    'worker_type': 'thread',
    'initial_delay': 0.1,  # Smallest polling interval, same as -d.
    'backoff': 1.15,  # Exponential backoff using this rate, -b.
    'max_delay': 10.0,  # Max possible polling interval, -m.
    'scheduler_interval': 1,  # Check schedule every second, -s.
    'periodic': False,  # Enable crontab feature.
    'check_worker_health': True,  # Enable worker health checks.
    'health_check_interval': 1,  # Check worker health every second.
}

huey_queue_options = {
    'huey_class': 'huey.RedisHuey',  # Huey implementation to use.
    'name': 'example',  # Use db name for huey.
    'results': True,  # Store return values of tasks.
    # TODO: consider storing None and using expiring Redis, and preserving by default
    'store_none': False,  # If a task returns None, do not save to results.
    'immediate': False,
    'utc': False,  # Use local time for all times internally.
    'blocking': True,  # Perform blocking pop rather than poll Redis.
    'connection': {
        'connection_pool': pool,
        # huey-specific connection parameters.
        'read_timeout': 1,  # If not polling (blocking pop), use timeout.
    },
    'consumer': huey_consumer_options,
}

DJANGO_HUEY = {
    'default': 'default',
    'queues': {
        'default': {
            **huey_queue_options,
            'name': 'default_tasks',
        },
        'foo': {
            **huey_queue_options,
            'name': 'foo_tasks'
        },
    },
}

When I attempt to run the count_stuff task I encounter an error like this:

huey.exceptions.HueyException: tasks.count_foos not found in TaskRegistry

This is because:

and create_message checks if the task is in the registry of the current queue, and if it is not, it raises HueyException as noted above. This makes sense given that huey does not itself support multiple queues and django-huey is mostly a wrapper. Given that, I'm not sure what the best approach for a fix or workaround would be - your thoughts would be much appreciated!

radusuciu avatar Apr 24 '23 21:04 radusuciu

Here is a reproduction repo: https://github.com/radusuciu/django-huey-issue14

radusuciu avatar Apr 25 '23 18:04 radusuciu

Hi! make sense to me, I'll be reviewing it soon and let you know. It's not a workaround, but you can use the same queue for chained tasks until the feature is added.

Seeing huey's code, I'm not sure if huey's api allows it, but I'll be reviewing it anyway

pablop94 avatar Apr 25 '23 19:04 pablop94

Would it be possible to specify multiple queues? Or perhaps having a special value of "*" to represent all queues?

radusuciu avatar Jul 21 '23 16:07 radusuciu