dramatiq icon indicating copy to clipboard operation
dramatiq copied to clipboard

broker_priority not work

Open notwhy opened this issue 1 year ago • 3 comments

Issues

I want to run tasks based on priority levels 。 but it not work

What OS are you using?

windows

What version of Dramatiq are you using?

Dramatiq 1.14.2 and i use Rabbitmq as Broker

What did you do?

i try two code to test the broker_priority ,but it does not work as i except dramatiq --threads 2 --processes 2 _url -Q testfun

@dramatiq.actor(time_limit=3600 * 1000, max_retries=0, queue_name='testfun', priority=MEDIUM)
def testfun(priority):
    testfun.logger.info("priority:" + str(priority))
    time.sleep(priority)

first i run

    i = 30
    j = 1
    while i >= 0:
        if (i >= j):
            print(i)
            i -= 1
            testfun.send_with_options(args=(i,), broker_priority=i)
        else:
            break

after it end i run

    for i in range(1,30):
        testfun.send_with_options(args=(i,), broker_priority=i)

What did you expect would happen?

I want to run tasks based on priority levels

What happened?

It appears to be out of order at runtime and not arranged according to the priority order Is it my misunderstanding that there is no such function ? and I did not see any information about the broker_priority document, please help me!!!

notwhy avatar Jun 06 '23 10:06 notwhy

When you init the broker, you need to add max_priority parameter (10 is the recommended value) so dramatiq will know to create the queue with this setting. Note that this must be set during queue creation, so if you have an existing queue in RabbitMQ, you'll have to create a new one.

davidt99 avatar Sep 11 '23 09:09 davidt99

i did had max_priority=10 in my broke. but it not work.

broker_middleware = [
    AgeLimit, TimeLimit,
    ShutdownNotifications, Callbacks, Pipelines, Retries
]

broker_middleware = [m() for m in broker_middleware]
from dramatiq.brokers.rabbitmq import RabbitmqBroker

broker = RabbitmqBroker(url="amqp://@%s:31013/my_vhost?heartbeat=0" % raibitmq_ip, max_priority=10,
                        middleware=broker_middleware)

notwhy avatar Sep 21 '23 06:09 notwhy

Take into account that the worker takes the next available message. If you produce the messages at the same rate as you consume them, then your queue always has one message. Can you make the test where you first send new messages to the queue and start the worker afterwards.

davidt99 avatar Sep 26 '23 11:09 davidt99