es-emergency-call icon indicating copy to clipboard operation
es-emergency-call copied to clipboard

Love and hate the pm

Open codeliner opened this issue 8 years ago • 11 comments

"Love and hate the pm" This comment by @lunetics caused a discussion about process managers and their role in an event sourced system in the prooph/improoph chat.

@bweston92 has provided his implementation of a process manager being an event sourced aggregate.

This issue tracks the discussion so that we can point to it later.

codeliner avatar Jun 27 '17 11:06 codeliner

@bweston92 's implementation

along with explanation:

It is a PM that's root AR is a "lead" creates various parts of an application and tracks the opening of the application at the end And on failures sends an e-mail

So, I will accept via HTTP a message RecordBusinessLoanLead, this will include all the information about the applicant, legal entity and business loan requirements along with the introducers id. This will then open up a new AR BusinessLoanLead and emit an event BusinessLoanLeadRecorded which will be emitted over AMQP and return 202 accepted.

Now the AMQP consumer for the leads service will consume BusinessLoanLeadRecorded in the process manager, it will locate the AR and call createApplicantForBusinessLoanLead($stateResolver, $commandBus) this will get the information about the lead from the state resolver and using the command bus dispatch a message CreateApplicant this will then because that it a HTTP call underneath will succeed or throw an exception, in which case I can try again depending on the error and eventually emit either a command RecordLeadApplicantCreated or RecordLeadApplicantCreationFailed.

Now the AMQP consumer for the leads service will consume RecordLeadApplicantCreated in the process manager, it will locate the AR and call createLegalEntityForBusinessLoanLead($stateResolver, $commandBus) this will get the information about the lead from the state resolver and using the command bus dispatch a message CreateLegalEntity this will then because that it a HTTP call underneath will succeed or throw an exception, in which case I can try again depending on the error and eventually emit either a command RecordLeadLegalEntityCreated or RecordLeadLegalEntityCreationFailed.

This will then happen for both opening the business loan application and tracking the source of the application.

However explaining this just made me realise my diagram is a bit wrong but hey that is the detail

codeliner avatar Jun 27 '17 11:06 codeliner

@bweston92

The command bus will be injected into the AR

Can you explain me why you need to inject the command bus into an AR?

codeliner avatar Jun 27 '17 11:06 codeliner

@bweston92 uses prooph/micro which is good to take a look on how you would work with a process manager in a functional world.

@bweston92

Injecting the command bus is the easy part, validating my logic on doing this is what I need to do

Sascha-Oliver Prolic @prolic 12:42

however I asked about where you inject :grinning:

Bradley Weston @bweston92 12:42

So the following is called by the main PM after receiving BusinessLoanRecorded is will dispatch attemptToCreateApplicantForLead which calls a command with ApplicantCreator interface.

function createApplicant(callable $stateResolver, Command\AttemptToCreateApplicantForLead $command, ApplicantCreator $applicantCreator): array {
        $state = $stateResolver();
        $attempts = 0;

        try {
            do {
                $response = $applicantCreator->create($state['applicant']);

                if ($response->isSuccessful()) {
                    break;
                }

                $retry = $response->canRetry() and $attempts++ < 5;
            } while($retry);

            if ($response->isSuccessful()) {
                return [
                    Event\LeadApplicantWasCreated::withData(
                        LeadId::fromString($state['lead_id']),
                        nextVersion($state)
                    ),
                ];
            } else {
                return [
                    Event\FailedToCreateApplicant::withData(
                        LeadId::fromString($state['lead_id']),
                        $response->errorMessage(),
                        $attempts,
                        nextVersion($state)
                    ),
                ];
            }
        } catch (\Throwable $unknownError) {
            return [
                Event\FailedToCreateApplicant::withData(
                    LeadId::fromString($state['lead_id']),
                    $unknownError->getMessage(),
                    $attempts,
                    nextVersion($state)
                ),
            ];
        }
    }

Sascha-Oliver Prolic @prolic 12:44 this is for micro right? this looks great

Bradley Weston @bweston92 12:44 Yes Then I have this in my dispatcher

    'Leads.AttemptToCreateApplicantForLead' => [
        'handler' => function (callable $stateResolver, Message $message) use ($factories): array {
            return \Cosmic\Lead\Model\createApplicant($stateResolver, $message, $factories['applicant_creator']);
        },
        'definition' => LeadAggregateDefinition::class,
    ],

codeliner avatar Jun 27 '17 12:06 codeliner

@codeliner so basically I was using the CommandBus as a "service" for dispatching my commands as I'm using an AR to be a PM.

bweston92 avatar Jun 27 '17 12:06 bweston92

However I removed the command bus idea and injected an ApplicantCreator as you can see in the createApplicantForLead AR method. (that you've got as createApplicant)

bweston92 avatar Jun 27 '17 12:06 bweston92

For prooph/micro I think the solution is very very easy, as you only got to do this: f ( f (history) -> state, event ) -> event[]

For a process manager that is supposed to send new commands, you can do something like this:

function (callable $stateResolver, Message $event) use ($commandBus) {
    $state = $stateResolver();
    if ($state['something'] === true) {
        $command = new SomeCommand($event->someThing());
        $commandBus->dispatch($command);
    }

    return [];
}

Now when using prooph/event-sourcing things get a little more complicated:

I need to have access to the command bus to send new commands (this could be done with the aggregate repository, which would need a custom repository factory).

Maybe prooph could add some infrastructure to support event sourced process managers with the event-sourcing component.

When using prooph/event-store projections, things are a little easier again, but still not as easy as with micro:

example:

$projection = $projectionManager->createModelProjection('some name');

$projection
    ->fromStream('event_stream')
    ->when([
        'someEvent' => function ($state, Message $event) use ($commandBus): array {
            if ($state['something'] === true) {
                $command = new SomeCommand($event->someThing());
                $commandBus->dispatch($command);
            }

            return $state;
        },
        'someOtherEvent' => function ($state, Message $event): array {
            $state['someting'] = true;

            return $state;
        }
    ])
    ->run();

Last but not least, there is the option to use custom persistence process managers, where the state is persisted somewhere else (f.e. mongodb): There is no infrastructure at all to support this atm.

Summary: There are at least 4 options on how to implement sages (aka stateful process managers) with prooph: a) using micro b) using event-sourcing c) using event-store projections d) using custom state persistence

In micro things are so easy, as you only need to write your function, that some example code is enough. For all the others, we might need to have a discussion, on how the best way would be to implement those and maybe add some infrastructure supporting this in the prooph ecosystem.

prolic avatar Jun 27 '17 13:06 prolic

Another note for event-sourcing:

We need to have a mechanism, that fetches the correct aggregate and passes it to an event handler, so something like this:

$handler = function (SomeThingHappened $event) use ($aggregateRepository): void {
    $ar = $aggregateRepository->get($event->someId());
    $ar->onEvent($event);
}

$eventRouter->route(SomeThingHappened::class)->to($handler);

Maybe it's not easy to come up with a general abstraction here, but at least some example code would be nice, so we can point to something when the question comes up again next time.

prolic avatar Jun 27 '17 13:06 prolic

The question is: Why do we need the aggregate state in the process manager?

The aggregate handles invariantes and therefor uses its state to make decisions. Like shown by @prolic an aggregate function type looks like this:

f ( f (history) -> state, event ) -> event[]

Following the discussion here an "aggregate function" should not have side effects. The function type above underlines this. No dependencies, just current state (left fold of past events) as input and new events (effects) as output. This can be tested easily in isolation and will always produce the same output when given the same input. Great!

Now let's look at a possible function type of a stateless process manager:

f (event, sideEffect) -> Maybe success

So the process manager handles or performs side effects (I/O, send message to messaging system, send new command to aggregate, ...) but it is itself stateless. All it needs is an event to react on. The event is the effect and the process manager executes that effect with an output of Maybe success or Result = Error | Success.

This means that aggregate and process manager form a team. The aggregate tells the outside world (where side effects are performed): SomethingHappened The outside world (where process managers live): react on SomethingHappened by changing the outside world. Once done they "send back" a new message to the aggregate (a new command) telling the aggregate that the outside world has changed so the aggregate should change, too.

Let's take @bweston92 's implementation.

//domain model
function openBusinessLoanLead(callable $resloveState, RecordBusinessLoanLead $command): array
{
  //...
  return [BusinessLoanLeadRecorded::with(/* ... */)];
}
//process manager
function createApplicantForBusinessLoanLead(BusinessLoanLeadRecorded $event, AmqpChannel $channel, int $retries = 0): array
{
  try {
    $channel->connect();
    $channel->publish(CreateApplicant::fromEvent($event));
  } catch (\Throwable $error) {
    if(canRetry($error, $retries)) {
      sleep(5);      
      return createApplicantForBusinessLoanLead($event, $channel, ++$retries);
    } else {
      return ["Failed to send command ...", false];
    }
  }
    
  return [null, true];
}

function canRetry(\Throwable $error, int $retries): bool {
  return $error instanceof AmqpChannelNotAvailable && $retries < 5;
}

Let's assume creating the Applicant is handled in another bounded context and once it is done we get an event back from that system.

We have again a simple process manager to react on that event:

//process manager
function trackRecordLeadApplicantCreated(RecordLeadApplicantCreated $event, BusinessLoan\CommandBus $commandBus, int $retries = 0): array
{
  try {
    $commandBus->dispatch(TrackRecordLeadApplicantCreated::fromEvent($event));
  } catch (\Throwable $error) {
    if(canRetry($error, $retries)) {
      sleep(5);
      return createApplicantForBusinessLoanLead($event, $commandBus, ++$retries);
    } else {
      return ["Failed to send command ...", false];
    }
  }
    
  return [null, true];
}

function canRetry(\Throwable $error, int $retries): bool {
  return $error->code >= 500 && $retries < 5;
}

In the second part of the example you can see that I'm sending a new command to BusinessLoanLead aggregate. I'm telling the aggregate that it should "track" progress of the process, but I still use the process mangers to perform side effects.

The point here is that you can use an aggregate to "track" progress of a business process but you should use process managers to perform side effects.

You can now set up a monitoring process manager that observes the event stream of BusinessLoanLead to make sure that the business process is finished within a defined time frame to make sure that the final goal is reached:

It is a PM that's root AR is a "lead" creates various parts of an application and tracks the opening of the application at the end

The monitoring process manager is a bonus. Maybe it is enough to just show progress in the UI (a projection is constantly updating the read model) or send the admin team emails in case of errors. Whatever works for your system.

The point here is, that it maybe looks like overhead but it is a simple set of rules, no magic or any other crazy stuff involved.

codeliner avatar Jun 27 '17 15:06 codeliner

Addition: I've used functions in the example to avoid boilerplate, but the OO version would pretty much look the same just with classes. It is the same simple concept.

codeliner avatar Jun 27 '17 15:06 codeliner

What i found very clear for understanding was that: https://github.com/justeat/ProcessManager

So you have 1 ProcessRouter which actually is responsible for retrieving the state of the process manager instance and decoupled the "logic" of the Process manager.

Also looking of having such stuff decoupled from the event-store (lot of people don't use ES)

lunetics avatar Jun 27 '17 21:06 lunetics

@lunetics Did you take a look at the implementation of the process manager of the linked repo?

The OrderProcessManager still reacts on domain events and IS NOT driven by its own state. The only reason why the process manager has own state is because they use pm state to avoid handling a message twice

That is not a bad idea but it requires a process manager that can be persisted. But it is still the same concept: The process manager reacts on domain events

I would not do it this way because now the process manager is not stateless and has two tasks:

  1. React on domain event and move the process forward
  2. Message deduplication

This violates SRP and makes the PM unnecessary complex. Move message deduplication to the runtime:

  1. Handle it in the message queue consumer by looking at message uuids.
  2. Use prooph's persistent projection feature to observe event streams and invoke stateless process manager. The projection takes care of handling a message only once. No need to move that into the PM.

It is also questionable if the process manager should really do nothing if it receives a second OrderPlaced event. Error handling is not shown in the repo. What if the aggregate has decided to send the message again, because the first attempt failed later in the process?

With the approach shown in the linked repo you need to keep aggregate state and process manager state in sync. IMO that is not a good idea.

Also looking of having such stuff decoupled from the event-store (lot of people don't use ES)

If you use Event Sourcing (and we are only looking at how the problem is solved using event sourcing, not how it is solved for all kind of architectures ...) then domain events are the single source of truth Don't introduce another source of truth (PM state).

codeliner avatar Jun 28 '17 07:06 codeliner