symfony
symfony copied to clipboard
[Messenger] Add per-message priority
Q | A |
---|---|
Branch? | 6.2 |
Bug fix? | no |
New feature? | yes |
Deprecations? | no |
Tickets | Fix #41573 |
License | MIT |
Doc PR | symfony/symfony-docs#15452 |
Todo
- [x] add the stamp & make it work for AMQP & Beanstalkd
- [x] add tests
- [x] discuss the problem of transports differences, do something about it (or not)
- [x] submit changes to the documentation
Usage example
$stamp = new PriorityStamp(127);
$this->bus->dispatch($event, [$stamp]);
# Setting an option for queue is required for AMQP
framework:
messenger:
transports:
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queues:
app.messenger:
arguments:
x-max-priority: 255
Problems
These two transports have quite different priority systems.
RabbitMQ:
- requires
x-max-priority
to be set when queue is created, you can't change it later - priority is stored as u8, can only use
0 .. 255
- it's recommended to have no more than 10 priority levels
- larger number = higher priority
Beanstalkd:
- no need to enable the feature, it just works
- priority is stored as u32
- lower number = higher priority, 0 is highest
- default is 2^32 - 1 (Pheanstalkd uses 1024, but so do a lot of other clients)
Hey!
I see that this is your first PR. That is great! Welcome!
Symfony has a contribution guide which I suggest you to read.
In short:
- Always add tests
- Keep backward compatibility (see https://symfony.com/bc).
- Bug fixes must be submitted against the lowest maintained branch where they apply (see https://symfony.com/releases)
- Features and deprecations must be submitted against the 5.4 branch.
Review the GitHub status checks of your pull request and try to solve the reported issues. If some tests are failing, try to see if they are failing because of this change.
When two Symfony core team members approve this change, it will be merged and you will become an official Symfony contributor! If this PR is merged in a lower version branch, it will be merged up to all maintained branches within a few days.
I am going to sit back now and wait for the reviews.
Cheers!
Carsonbot
I see two ways to deal with lower/higher number issue:
- Just warn about it in the docs and do nothing. In this case switching from one adapter to another will require changes to the domain code (e.g.
PriorityLevel
enum). That creates some margin for mistakes - Only accept values between 0 and 255 in
PriorityStamp
constructor. Use them as-is for RabbitMQ, and inverse the value for Beanstalkd (255 - $priority
)
I'd personally prefer the latter, but let's discuss it first.
Took this one as far as I can, would appreciate reviews.
RabbitMQ doesn't allow to change x-max-priority
for an existing queue. To avoid BC breaks, the feature is disabled by default, but will work for those who have x-max-priority
argument set in config.
Still trying to figure out if this can be done for Redis adapter without BC breaks.
@carsonbot find me a reviewer please
Status: needs review
@X-Coder264 could maybe review this PR?
Hey!
I think @tyx has recently worked with this code. Maybe they can help review this?
Cheers!
Carsonbot
Thank you for this PR!
Have you tested the AMQP implementation? I've built almost the same implementation today (creating queues with priority option, decorated the MessageBusInterface to AmqpStamp certain messages based on their parameters). Consider the following:
- 30 messages are dispatched to queue 1 with priority 0. Each message takes 1s to complete.
- sleep 5s
- 10 messages are dispatched to queue 1 with priority 3. Each message takes 1s to complete. Normally, you'd expect the following flow:
- 5 messages are handled
- then all 10 prioritized messages are handled
- then the remaining 25 messages are handled However, what I've found in my testing is that since all messages are ACKed by the consumer ahead of time, the prioritization does not have the effect we'd expect.
The RabbitMQ documentation has a piece on how this is mostly expected behavior unless QoS is used or the prefetch count is reduced (https://www.rabbitmq.com/priority.html#interaction-with-consumers).
In addition to that, the current Symfony documentation states that the prefetch count is deprecated since it has no effect:
https://symfony.com/doc/current/messenger.html#amqp-transport
Setting it manually in Connection.php also does not fix this. I'm just adding a comment for now since I had no time to test your PR. Maybe I'm doing something completely wrong, but the implementation is very similar.
Have you faced this issue as well?
Hi @rjhllr , thanks for highlighting this possible issue.
As far as I can tell by Symfony\Component\Messenger\Worker
code, messages are only acknowledged after they are handled.
I've tried to emulate your scenario using my PR's branch:
class MyEvent
{
public $priorityAsText;
public function __construct(string $priorityAsText)
{
$this->priorityAsText = $priorityAsText;
}
}
class MyEventHandler implements MessageHandlerInterface
{
public function __invoke(MyEvent $event)
{
sleep(1);
dump($event->priorityAsText);
}
}
class MyEventSpawnCommand extends Command
{
// <...>
protected function execute(InputInterface $input, OutputInterface $output): int
{
for ($i = 0; $i < 5; $i++) {
$event = new MyEvent("LOW");
$this->eventBus->dispatch($event);
}
sleep(3);
for ($i = 0; $i < 5; $i++) {
$event = new MyEvent("HIGH");
$this->eventBus->dispatch($event, [new PriorityStamp(3)]);
}
return Command::SUCCESS;
}
}
First I start messenger:consume
, then run the CLI command. The result matches expectations:
^ "LOW"
^ "LOW"
^ "LOW"
^ "HIGH"
^ "HIGH"
^ "HIGH"
^ "HIGH"
^ "HIGH"
^ "LOW"
^ "LOW"
But I'm not sure I understood the "each message takes 1s to complete" correctly. Does this test cover your case?
Hi @sroze , you're marked as a code owner, could you take a look at this one, please?
Hi @OskarStark ,
Sorry to bother you, but maybe you could help me to find someone to review this feature.
Regarding the checks, fabbot is asking for a dot after colon, I believe that's incorrect. And I'm not quite sure what happened with PHPUnit 8.0 low-deps suite.
Hi
You can ignore the test for now.
And fabbot is indeed a false positive.
Rebased to 6.2, but I guess it still needs a review