php-ddd
php-ddd copied to clipboard
Domain Event Publisher for Doctrine Entities
Currently we use Prooph Event Sourcing for our Aggregate Roots. Since the Prooph Service Bus was deprecated we use the Symfony Messenger instead.
services:
_defaults:
public: false
Prooph\EventStoreBusBridge\EventPublisher:
class: Acme\Shared\Infrastructure\Prooph\EventPublisher
arguments:
- '@event.bus'
tags:
- { name: 'prooph_event_store.default.plugin' }
<?php
namespace Acme\Shared\Infrastructure\Prooph;
use Iterator;
use Prooph\Common\Event\ActionEvent;
use Prooph\EventStore\ActionEventEmitterEventStore;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Plugin\AbstractPlugin;
use Prooph\EventStore\TransactionalActionEventEmitterEventStore;
use Symfony\Component\Messenger\MessageBusInterface;
final class EventPublisher extends AbstractPlugin
{
private MessageBusInterface $eventBus;
/**
* @var Iterator[]
*/
private array $cachedEventStreams = [];
public function __construct(MessageBusInterface $eventBus)
{
$this->eventBus = $eventBus;
}
public function attachToEventStore(ActionEventEmitterEventStore $eventStore): void
{
$this->listenerHandlers[] = $eventStore->attach(
ActionEventEmitterEventStore::EVENT_APPEND_TO,
function (ActionEvent $event) use ($eventStore): void {
$recordedEvents = $event->getParam('streamEvents', new \ArrayIterator());
if (! $this->inTransaction($eventStore)) {
if ($event->getParam('streamNotFound', false)
|| $event->getParam('concurrencyException', false)
) {
return;
}
foreach ($recordedEvents as $recordedEvent) {
$this->eventBus->dispatch($recordedEvent);
}
} else {
$this->cachedEventStreams[] = $recordedEvents;
}
}
);
$this->listenerHandlers[] = $eventStore->attach(
ActionEventEmitterEventStore::EVENT_CREATE,
function (ActionEvent $event) use ($eventStore): void {
$stream = $event->getParam('stream');
$recordedEvents = $stream->streamEvents();
if (! $this->inTransaction($eventStore)) {
if ($event->getParam('streamExistsAlready', false)) {
return;
}
foreach ($recordedEvents as $recordedEvent) {
$this->eventBus->dispatch($recordedEvent);
}
} else {
$this->cachedEventStreams[] = $recordedEvents;
}
}
);
if ($eventStore instanceof TransactionalActionEventEmitterEventStore) {
$this->listenerHandlers[] = $eventStore->attach(
TransactionalActionEventEmitterEventStore::EVENT_COMMIT,
function (ActionEvent $event): void {
foreach ($this->cachedEventStreams as $stream) {
foreach ($stream as $recordedEvent) {
$this->eventBus->dispatch($recordedEvent);
}
}
$this->cachedEventStreams = [];
}
);
$this->listenerHandlers[] = $eventStore->attach(
TransactionalActionEventEmitterEventStore::EVENT_ROLLBACK,
function (ActionEvent $event): void {
$this->cachedEventStreams = [];
}
);
}
}
private function inTransaction(EventStore $eventStore): bool
{
return $eventStore instanceof TransactionalActionEventEmitterEventStore
&& $eventStore->inTransaction();
}
}
We have some legacy projects using Doctrine Entities. We don't want to use Event Sourcing. But we would like to publish Domain Events from the Entity to the Messenger Bus.
Has anybody implemented a Domain Publisher using Doctrine Event Subscribers or Lifecycle Callbacks for instance? I guess flushing on the entity manager and publishing the domain event should happen in the same transaction?
There is an extension by @vaniocz @nixbody @maryo: https://github.com/vaniocz/doctrine-domain-events
I guess flushing on the entity manager and publishing the domain event should happen in the same transaction?
As documented:
The event is dispatched at the end of transaction once your entity has been flushed and all the changes are projected into database so it is possible to both perform database queries over the changes as well as cancel the transaction.
Source: https://github.com/vaniocz/doctrine-domain-events#raising-domain-events