symfony icon indicating copy to clipboard operation
symfony copied to clipboard

[Messenger] Add per-message priority

Open kozlice opened this issue 3 years ago • 12 comments

Q A
Branch? 6.2
Bug fix? no
New feature? yes
Deprecations? no
Tickets Fix #41573
License MIT
Doc PR symfony/symfony-docs#15452

Todo

  • [x] add the stamp & make it work for AMQP & Beanstalkd
  • [x] add tests
  • [x] discuss the problem of transports differences, do something about it (or not)
  • [x] submit changes to the documentation

Usage example

$stamp = new PriorityStamp(127);
$this->bus->dispatch($event, [$stamp]);
# Setting an option for queue is required for AMQP
framework:
    messenger:
        transports:
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queues:
                        app.messenger:
                            arguments:
                                x-max-priority: 255

Problems

These two transports have quite different priority systems.

RabbitMQ:

  • requires x-max-priority to be set when queue is created, you can't change it later
  • priority is stored as u8, can only use 0 .. 255
  • it's recommended to have no more than 10 priority levels
  • larger number = higher priority

Beanstalkd:

  • no need to enable the feature, it just works
  • priority is stored as u32
  • lower number = higher priority, 0 is highest
  • default is 2^32 - 1 (Pheanstalkd uses 1024, but so do a lot of other clients)

kozlice avatar Jun 06 '21 20:06 kozlice

Hey!

I see that this is your first PR. That is great! Welcome!

Symfony has a contribution guide which I suggest you to read.

In short:

  • Always add tests
  • Keep backward compatibility (see https://symfony.com/bc).
  • Bug fixes must be submitted against the lowest maintained branch where they apply (see https://symfony.com/releases)
  • Features and deprecations must be submitted against the 5.4 branch.

Review the GitHub status checks of your pull request and try to solve the reported issues. If some tests are failing, try to see if they are failing because of this change.

When two Symfony core team members approve this change, it will be merged and you will become an official Symfony contributor! If this PR is merged in a lower version branch, it will be merged up to all maintained branches within a few days.

I am going to sit back now and wait for the reviews.

Cheers!

Carsonbot

carsonbot avatar Jun 06 '21 20:06 carsonbot

I see two ways to deal with lower/higher number issue:

  1. Just warn about it in the docs and do nothing. In this case switching from one adapter to another will require changes to the domain code (e.g. PriorityLevel enum). That creates some margin for mistakes
  2. Only accept values between 0 and 255 in PriorityStamp constructor. Use them as-is for RabbitMQ, and inverse the value for Beanstalkd (255 - $priority)

I'd personally prefer the latter, but let's discuss it first.

kozlice avatar Jun 06 '21 20:06 kozlice

Took this one as far as I can, would appreciate reviews.

RabbitMQ doesn't allow to change x-max-priority for an existing queue. To avoid BC breaks, the feature is disabled by default, but will work for those who have x-max-priority argument set in config.

Still trying to figure out if this can be done for Redis adapter without BC breaks.

kozlice avatar Jun 14 '21 15:06 kozlice

@carsonbot find me a reviewer please

Status: needs review

kozlice avatar Jun 22 '21 09:06 kozlice

@X-Coder264 could maybe review this PR?

carsonbot avatar Jun 22 '21 09:06 carsonbot

Hey!

I think @tyx has recently worked with this code. Maybe they can help review this?

Cheers!

Carsonbot

carsonbot avatar Jun 22 '21 18:06 carsonbot

Thank you for this PR!

Have you tested the AMQP implementation? I've built almost the same implementation today (creating queues with priority option, decorated the MessageBusInterface to AmqpStamp certain messages based on their parameters). Consider the following:

  • 30 messages are dispatched to queue 1 with priority 0. Each message takes 1s to complete.
  • sleep 5s
  • 10 messages are dispatched to queue 1 with priority 3. Each message takes 1s to complete. Normally, you'd expect the following flow:
  • 5 messages are handled
  • then all 10 prioritized messages are handled
  • then the remaining 25 messages are handled However, what I've found in my testing is that since all messages are ACKed by the consumer ahead of time, the prioritization does not have the effect we'd expect.

The RabbitMQ documentation has a piece on how this is mostly expected behavior unless QoS is used or the prefetch count is reduced (https://www.rabbitmq.com/priority.html#interaction-with-consumers).

In addition to that, the current Symfony documentation states that the prefetch count is deprecated since it has no effect: image https://symfony.com/doc/current/messenger.html#amqp-transport

Setting it manually in Connection.php also does not fix this. I'm just adding a comment for now since I had no time to test your PR. Maybe I'm doing something completely wrong, but the implementation is very similar.

Have you faced this issue as well?

rjhllr avatar Jun 23 '21 16:06 rjhllr

Hi @rjhllr , thanks for highlighting this possible issue.

As far as I can tell by Symfony\Component\Messenger\Worker code, messages are only acknowledged after they are handled.

I've tried to emulate your scenario using my PR's branch:

class MyEvent
{
    public $priorityAsText;
    public function __construct(string $priorityAsText)
    {
        $this->priorityAsText = $priorityAsText;
    }
}
class MyEventHandler implements MessageHandlerInterface
{
    public function __invoke(MyEvent $event)
    {
        sleep(1);
        dump($event->priorityAsText);
    }
}
class MyEventSpawnCommand extends Command
{
    // <...>

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        for ($i = 0; $i < 5; $i++) {
            $event = new MyEvent("LOW");
            $this->eventBus->dispatch($event);
        }

        sleep(3);

        for ($i = 0; $i < 5; $i++) {
            $event = new MyEvent("HIGH");
            $this->eventBus->dispatch($event, [new PriorityStamp(3)]);
        }

        return Command::SUCCESS;
    }
}

First I start messenger:consume, then run the CLI command. The result matches expectations:

^ "LOW"
^ "LOW"
^ "LOW"
^ "HIGH"
^ "HIGH"
^ "HIGH"
^ "HIGH"
^ "HIGH"
^ "LOW"
^ "LOW"

But I'm not sure I understood the "each message takes 1s to complete" correctly. Does this test cover your case?

kozlice avatar Jun 25 '21 07:06 kozlice

Hi @sroze , you're marked as a code owner, could you take a look at this one, please?

kozlice avatar Jul 26 '21 17:07 kozlice

Hi @OskarStark ,

Sorry to bother you, but maybe you could help me to find someone to review this feature.

Regarding the checks, fabbot is asking for a dot after colon, I believe that's incorrect. And I'm not quite sure what happened with PHPUnit 8.0 low-deps suite.

kozlice avatar Oct 05 '21 09:10 kozlice

Hi

You can ignore the test for now.

And fabbot is indeed a false positive.

OskarStark avatar Oct 05 '21 11:10 OskarStark

Rebased to 6.2, but I guess it still needs a review

kozlice avatar Jun 14 '22 12:06 kozlice