dramatiq
dramatiq copied to clipboard
broker_priority not work
Issues
I want to run tasks based on priority levels 。 but it not work
What OS are you using?
windows
What version of Dramatiq are you using?
Dramatiq 1.14.2 and i use Rabbitmq as Broker
What did you do?
i try two code to test the broker_priority ,but it does not work as i except dramatiq --threads 2 --processes 2 _url -Q testfun
@dramatiq.actor(time_limit=3600 * 1000, max_retries=0, queue_name='testfun', priority=MEDIUM)
def testfun(priority):
testfun.logger.info("priority:" + str(priority))
time.sleep(priority)
first i run
i = 30
j = 1
while i >= 0:
if (i >= j):
print(i)
i -= 1
testfun.send_with_options(args=(i,), broker_priority=i)
else:
break
after it end i run
for i in range(1,30):
testfun.send_with_options(args=(i,), broker_priority=i)
What did you expect would happen?
I want to run tasks based on priority levels
What happened?
It appears to be out of order at runtime and not arranged according to the priority order Is it my misunderstanding that there is no such function ? and I did not see any information about the broker_priority document, please help me!!!
When you init the broker, you need to add max_priority
parameter (10 is the recommended value) so dramatiq will know to create the queue with this setting. Note that this must be set during queue creation, so if you have an existing queue in RabbitMQ, you'll have to create a new one.
i did had max_priority=10 in my broke. but it not work.
broker_middleware = [
AgeLimit, TimeLimit,
ShutdownNotifications, Callbacks, Pipelines, Retries
]
broker_middleware = [m() for m in broker_middleware]
from dramatiq.brokers.rabbitmq import RabbitmqBroker
broker = RabbitmqBroker(url="amqp://@%s:31013/my_vhost?heartbeat=0" % raibitmq_ip, max_priority=10,
middleware=broker_middleware)
Take into account that the worker takes the next available message. If you produce the messages at the same rate as you consume them, then your queue always has one message. Can you make the test where you first send new messages to the queue and start the worker afterwards.