php-ddd icon indicating copy to clipboard operation
php-ddd copied to clipboard

How to merge event-sourced aggregate roots (A+ES) with Prooph PDO event stream

Open webdevilopers opened this issue 5 years ago • 3 comments

All events of multiple aggregates have to be selected, changed in some domain relevant ways and be merged into a new aggregate.

I decided to select the events from the default repository using the metadata matcher.

I did not create a new aggregate by calling the factory methods with the new data. Since the data passed was already valid at the time to original events took place and were applied. Instead I decided to use reflection to pass the "recorded" events.

This was inspired by @prooph code:

namespace Prooph\Common\Messaging;

abstract class DomainMessage implements Message
{
    public static function fromArray(array $messageData): DomainMessage
    {
        MessageDataAssertion::assert($messageData);

        $messageRef = new \ReflectionClass(\get_called_class());

        /** @var $message DomainMessage */
        $message = $messageRef->newInstanceWithoutConstructor();

        $message->uuid = Uuid::fromString($messageData['uuid']);
        $message->messageName = $messageData['message_name'];
        $message->metadata = $messageData['metadata'];
        $message->createdAt = $messageData['created_at'];
        $message->setPayload($messageData['payload']);

        return $message;
    }
}

At the bottom line a lot of prooph-inspired code was used. At the bottom line I think there is not too much coupling. This example will have a unit test with an in-memory solution. Will add it soon.

Just like any other use case the new aggregate is stored to the repository. The event publisher publishes all events and the projector create the read models and process manager eventually publish some messages to the outside world. Another process manager will catch the final MergedWithStaffMembers event and fire some "removeMergedStaffMember" commands.

Would love to have your feedback on this approach @Ocramius, @prolic.

The factory:

<?php

namespace Acme\Staff\Domain\Service;

use DomainException;
use Prooph\EventSourcing\AggregateChanged;
use Ramsey\Uuid\Uuid;
use ReflectionClass;
use ReflectionProperty;
use Acme\Staff\Domain\Model\StaffMember\ContractId;
use Acme\Staff\Domain\Model\StaffMember\EmploymentPeriod;
use Acme\Staff\Domain\Model\StaffMember\Event\MergedWithStaffMembers;
use Acme\Staff\Domain\Model\StaffMember\Event\StaffMemberAdded;
use Acme\Staff\Domain\Model\StaffMember\Event\StaffMemberContractModified;
use Acme\Staff\Domain\Model\StaffMember\StaffMember;
use Acme\Staff\Domain\Model\StaffMember\StaffMemberId;
use Acme\Staff\Domain\Model\StaffMember\StaffMemberRepository;

final class MergedStaffMember
{
    /** @var StaffMemberRepository */
    private $staffMemberRepository;

    /**
     * Current version
     *
     * @var int
     */
    private $version = 0;

    /**
     * List of events that are not committed to the EventStore
     *
     * @var AggregateChanged[]
     */
    private $recordedEvents = [];

    /** @var StaffMemberId */
    private $newStaffMemberId;

    /** @var StaffMemberId[] */
    private $mergeWithStaffMemberIds = [];

    /** @var ContractId */
    private $newContractId;

    /** @var EmploymentPeriod */
    private $newEmploymentPeriod;

    public function __construct(StaffMemberRepository $staffMemberRepository)
    {
        $this->staffMemberRepository = $staffMemberRepository;
    }

    public function fromMergedHistory(
        StaffMemberId $newStaffMemberId, array $mergeWithStaffMemberIds,
        ContractId $newContractId, EmploymentPeriod $newEmploymentPeriod
    ): StaffMember
    {
        if (0 === count($mergeWithStaffMemberIds)) {
            throw new DomainException('Missing staff members to merge');
        }

        $this->newStaffMemberId = $newStaffMemberId;
        $this->mergeWithStaffMemberIds = $mergeWithStaffMemberIds;
        $this->newContractId = $newContractId;
        $this->newEmploymentPeriod = $newEmploymentPeriod;

        $this->buildHistoryFromMergedStaffMembers();
        $this->finalizeNewStaffMemberHistory();

        $newStaffMemberRef = new ReflectionClass(StaffMember::class);

        /** @var StaffMember $newStaffMember */
        $newStaffMember = $newStaffMemberRef->newInstanceWithoutConstructor();

        $newStaffMemberRecordedEventsRef = new ReflectionProperty($newStaffMember, 'recordedEvents');
        $newStaffMemberRecordedEventsRef->setAccessible(true);
        $newStaffMemberRecordedEventsRef->setValue($newStaffMember, $this->recordedEvents);

        $newStaffMemberStaffMemberIdRef = new ReflectionProperty($newStaffMember, 'staffMemberId');
        $newStaffMemberStaffMemberIdRef->setAccessible(true);
        $newStaffMemberStaffMemberIdRef->setValue($newStaffMember, $newStaffMemberId);

        return $newStaffMember;
    }

    private function buildHistoryFromMergedStaffMembers(): void
    {
        $oldEvents = $this->staffMemberRepository->ofStaffMemberIds($this->mergeWithStaffMemberIds);

        // Ensure chronological order
        uasort($oldEvents, function(AggregateChanged $a, AggregateChanged $b) {
            return $a->createdAt() <=> $b->createdAt();
        });

        $initialStaffMemberId = StaffMemberId::fromString(reset($oldEvents)->aggregateId());

        /** @var AggregateChanged[] $oldEvent */
        foreach ($oldEvents as $oldEvent) {
            $newMessageData = $oldEvent->toArray();
            // The new event needs an own unique ID.
            $newMessageData['uuid'] = Uuid::uuid4()->toString();
            // Set the new staff member ID instead of the merged one.
            $newMessageData['metadata']['_aggregate_id'] = $this->newStaffMemberId->toString();
            // This will be automatically reset correctly.
            unset($newMessageData['metadata']['_position']);

            if ($oldEvent instanceof StaffMemberAdded) {
                /** @var StaffMemberAdded $oldEvent */
                if (!$oldEvent->staffMemberId()->sameValueAs($initialStaffMemberId)) {
                    // Only the initial event can add a staff member.
                    // All other events can only be modifications of the contract.
                    $newMessageData['message_name'] = StaffMemberContractModified::class;
                }
            }

            if ($oldEvent instanceof StaffMemberAdded || $oldEvent instanceof StaffMemberContractModified) {
                $newMessageData['payload']['contractId'] = $this->newContractId->toString();
                // Set new employment period to satisfy all time-period relevant policies.
                $newMessageData['payload']['employmentPeriod'] = $this->newEmploymentPeriod->toArray();
            }

            $eventClassName = $newMessageData['message_name'];

            $newEvent = $eventClassName::fromArray($newMessageData);

            $this->recordThat($newEvent);
        }
    }

    private function finalizeNewStaffMemberHistory(): void
    {
        // Create final event
        $mergedWithStaffMembers = MergedWithStaffMembers::with(
            $this->newStaffMemberId, $this->mergeWithStaffMemberIds,
            $this->newContractId, $this->newEmploymentPeriod
        );
        $mergedWithStaffMembers = $mergedWithStaffMembers
            ->withAddedMetadata('_aggregate_type', StaffMember::class)
        ;

        $this->recordThat($mergedWithStaffMembers);
    }

    /**
     * Record an aggregate changed event
     */
    protected function recordThat(AggregateChanged $event): void
    {
        $this->version += 1;

        $this->recordedEvents[] = $event->withVersion($this->version);
    }
}

The command handler:

<?php

namespace Acme\Staff\Application\Service\StaffMember;

use Acme\Staff\Domain\Model\StaffMember\StaffMemberRepository;
use Acme\Staff\Domain\Service\MergedStaffMember;

final class MergeStaffMembersHandler
{
    /** @var MergedStaffMember */
    private $mergedStaffMember;

    /** @var StaffMemberRepository */
    private $staffMemberRepository;

    public function __construct(MergedStaffMember $mergedStaffMember, StaffMemberRepository $staffMemberRepository)
    {
        $this->mergedStaffMember = $mergedStaffMember;
        $this->staffMemberRepository = $staffMemberRepository;
    }

    public function __invoke(MergeStaffMembers $command): void
    {
        $newStaffMember = $this->mergedStaffMember->fromMergedHistory(
            $command->newStaffMemberId(),
            $command->mergeWithStaffMemberIds(),
            $command->newContractId(),
            $command->newEmploymentPeriod()
        );

        $this->staffMemberRepository->save($newStaffMember);
    }
}

Just some framework - Symfony and YAML for Marco ;) - config:

services:

    Rewotec\Staff\Domain\Service\MergedStaffMember:
      arguments:
        - '@staff_member_collection'

    Rewotec\Staff\Application\Service\StaffMember\MergeStaffMembersHandler:
        public: true
        tags: [messenger.message_handler]
        arguments:
            - '@Rewotec\Staff\Domain\Service\MergedStaffMember'
            - '@staff_member_collection'

webdevilopers avatar Jul 31 '20 09:07 webdevilopers

I would prefer not to inject the repository into the aggregate root. Do it from the outside instead.

prolic avatar Aug 03 '20 15:08 prolic

Actually MergedEmploymentContract is NOT an aggregate root. It's just a factory using the typical recordThat method of prooph which indeed is include in the AggregateRoot class. But that one is not extended here. The factory is just not named "Factory" but after the result aggregate to expect. There is no "natural" aggregate that comes from the merging. It's just written to the regular Employment Contract stream.

webdevilopers avatar Aug 03 '20 19:08 webdevilopers

OK my bad, looks good to me after quick review

On Mon, Aug 3, 2020, 15:33 webDEVILopers [email protected] wrote:

Actually MergedEmploymentContract is NOT an aggregate root. It's just a factory using the typical recordThat method of prooph which indeed is include in the AggregateRoot class. But that one is not extended here. The factory is just not named "Factory" but after the result aggregate to expect. There is no "natural" aggregate that comes from the merging. It's just written to the regular Employment Contract stream.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/webdevilopers/php-ddd/issues/45#issuecomment-668203099, or unsubscribe https://github.com/notifications/unsubscribe-auth/AADAJPC54ATYNWF67O4NCBTR64GIPANCNFSM4PP4GXSQ .

prolic avatar Aug 03 '20 19:08 prolic