core icon indicating copy to clipboard operation
core copied to clipboard

Add the ability to choose the messenger bus used in the data persister

Open soyuka opened this issue 5 years ago • 6 comments

See: https://github.com/api-platform/api-platform/issues/1095

soyuka avatar Apr 12 '19 10:04 soyuka

I thought a bit about this and CQRS related architecture.

It'd be nice to be able to use two buses in Api Platform:

  • Commands (async)
  • Queries (sync)

Now, to have queries we'd have to use a bus inside a DataProvider?

Thing is, the messenger component is really powerfull and could handle Validation etc. inside middlewares (which it does already) or even doctrine transaction (https://github.com/symfony/symfony/blob/master/src/Symfony/Bridge/Doctrine/Messenger/DoctrineTransactionMiddleware.php). All this makes me think that the messenger should be handled upper in the chain (not only in persisting).

Say I want to benefit from the power of messenger, I'd register an event listener that comes post-denormalization, retrieve a result via the messenger (skip almost all our internal listeners and use middlewares instead) to end up giving a result via our current serialization process.

Anyway, I've looked into adding multiple buses on our current implementation and unless we have some kind of BusRegistry (which imo is anti-pattern) it's hard to add.

soyuka avatar Apr 16 '19 08:04 soyuka

Think it's quite an important thing because it may affect application scaling (eg. handling of messages locally or via worker on another machine).

er1z avatar May 10 '19 09:05 er1z

Blowing off the dust on this one. With current auto injection we could leverage this behavior as well (I think)

when injecting a bus in a constructor you could for example do either of the following:

public function __construct(MessageBusInterface $messageBus) {}
public function __construct(MessageBusInterface $commandBus) {}

This will in turn give you the correct bus as defined in the messenger config

framework:
    messenger:
        default_bus: command.bus

        buses:
            command.bus:
                default_middleware: false
                middleware:
                    - handle_message

            query.bus:
                default_middleware: false
                middleware:
                    - handle_message

            event.bus:
                default_middleware: allow_no_handlers

So if it would be as easy as defining the bus name on the api resource, i.e.

#[ApiResource(collectionOperations: [ 'get',
        'create' => [
            'status' => 202,
            'messenger' => 'input',
            'bus' => 'command.bus',
            'input' => DTO::class,
            'output' => false,
            'method' => 'POST',
            'path' => '/my/route',
        ]
    ], itemOperations: [ 'get' ])]

Hope my 2 cents here are putting you in the right direction.

ricohumme avatar Nov 25 '21 11:11 ricohumme

I would love this feature to be implemented. I also use a command bus and a query bus. Currently, I use a default bus with no handlers. There is just a middleware, the sends the message to another bus based on a "registry" and the operation name (I get that from the ContextStamp). But that's not ideal.

SpiGAndromeda avatar Jul 03 '22 07:07 SpiGAndromeda

Hi,

I have a solution for you, because I was facing the same issue, and found a workaround on 3.0.0-rc2.

services.yaml :

    api_platform.message_bus:
        alias: App\Messenger\ApiPlatformBusSelector

src\Messenger\ApiPlatformBusSelector :

<?php

namespace App\Messenger;

use App\Model\Rep\MessageA;
use App\Model\Rep\MessageB;
use App\Model\Rep\MessageC;
use App\Model\Rep\MessageD;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;

class ApiPlatformBusSelector implements MessageBusInterface
{
    public function __construct(
        /* Default bus */
        private readonly MessageBusInterface $messageBus,
        /* Specific bus */
        private readonly MessageBusInterface $specificBus)
    {

    }

    public function dispatch(object $message, array $stamps = []): Envelope
    {
        if ($message->getMessage() instanceof MessageA
        || $message->getMessage() instanceof MessageB
        || $message->getMessage() instanceof MessageC
        || $message->getMessage() instanceof MessageD) {
            return $this->specificBus->dispatch($message, $stamps);
        }

        return $this->messageBus->dispatch($message, $stamps);
    }
}

Don't forget to add messenger: input in your entity/model.

Symfony Messenger can't selected a bus by this name with a manager or tagged_iterator, only by container injection.

yobrx avatar Aug 15 '22 11:08 yobrx

@yobrx Timing is funny :smile: I also developed a solution but maybe a little more complex: https://github.com/it-bens/message-bus-redirect-bundle

This bundles allows to redirect messages from one bus to another. It's working by now but requires a little polishing and a decent Readme file. The bundle requires some configuration. I think the most suitable way to determine the target bus is the MessageClassStrategy, which requires a DTO-class-to-busName map in the bundle configuration. Some examples can be found in the test configurations.

I am planning to add another decision strategy for API platform in the separated package (to keep the core bundle working without api-platform) which will determine the target bus by API Platform endpoints and actions.

An important hint if somebody wants to try the bundle: the middleware has to be registered manually. Of cause it would make sense to place this middleware on top of the middleware stack. This can be achived by this bus configuration:

default_middleware: false
middleware:
    # The symfony messenger allows no explicit middleware order.
    # The workaround is to remove the default middlewares with 'default_middleware' = false
    # and add them in an explicit order.
    # The HandleMessageMiddleware is added because the default bus should not handle messages.
    - ITB\MessageBusRedirectBundle\MessageRedirectMiddleware
    - add_bus_name_stamp_middleware
    - reject_redelivered_message_middleware
    - dispatch_after_current_bus
    - failed_message_processing_middleware
    - send_message

The middleware is also not passing the envelope to the next middleware. Maybe I will change that later.

SpiGAndromeda avatar Aug 15 '22 11:08 SpiGAndromeda

closing as old, do whatever in a processor/provider

soyuka avatar Oct 17 '23 10:10 soyuka