php-ddd
php-ddd copied to clipboard
When, where and how to create Summary Events
Currently we have a service A that publishes messages to the outside world (e.g Service B) whenever 1 of about 20 stage changes happen in our context.
final class StaffMember extends AggregateRoot
{
public function assignToJobFunction(JobFunctionId $jobFunctionId, string $jobFunctionName): void
{
$this->recordThat(AssignedToJobFunction::with($this->staffMemberId, $jobFunctionId, $jobFunctionName));
}
// 19 more state changes
}
final class StaffPublisher extends Producer implements MessageSubscriberInterface
{
/** @var ProducerInterface */
protected $producer;
public function whenAssignedToJobFunction(AssignedToJobFunction $event): void
{
$this->publish($event->serialize(), 'acme.staff.staff.member_assigned_to_job_function');
}
}
The consumer does not require to know all the details leading to the actual state change. And we do not want to add a message broker routing key all the time.
After reading the following article by @mathiasverraes we decided to go for a "Summary Event":
- https://verraes.net/2019/05/patterns-for-decoupling-distsys-summary-event/
You can use a Projection to generate the Summary Event. A Projection is an event listener that persists state from events it listens to, and either exposes that state by responding to queries, or by emitting new events.
The problem with our current implementation:
When the event reaches our Publisher ("Event Listener") only some state data is in-memory of the event. We could try to query the read model. But there is no guarantee that is has already been updated before.
One way (1) to solve that is to force a delay. Or use a cronjob (2 that sends summaries every 2 minutes. The latter solution would require some kind of publishing log to know which summaries were already published.
Another hack (3) would be to query the read model which may not be up-to-date yet. But then overwrite its data only with the properties that changed taken from the event e.g. "jobFunctionId" + Name.
A different approach (4) we are thinking about is adding an event e.g. "StaffMemberModified" or "StaffMemberDetailsSummarized". This could be fired in addition to all the other domain events and include the entire state.
final class StaffMember extends AggregateRoot
{
/** @var StaffMemberId */
private $staffMemberId;
/** @var JobFunctionId */
private $jobFunctionId;
/** @var string */
private $jobFunctionName;
// 19+x more properties holding the state only for the extra event, currently not required for protecting invariants
public function assignToJobFunction(JobFunctionId $jobFunctionId, string $jobFunctionName): void
{
$this->recordThat(AssignedToJobFunction::with($this->staffMemberId, $jobFunctionId, $jobFunctionName));
$this->recordThat(StaffMemberDetailsSummarized::with($this->staffMemberId, $jobFunctionId, $jobFunctionName, // entire state...));
}
// 19 more state changes
}
This solves the publisher problem. But it adds the entire state on the event-sourced aggregate-root. This has no technical disadvantages though. It's just more code.
Do you have any suggestions?
Thanks in advance.
Came from:
- https://twitter.com/webdevilopers/status/1309039756194582531
The state of the projection can be a separate layer from read and write models. In this case it would be a segregated event layer. In our applications we do this a lot when process summaries need to aggregate over multiple steps. We then store the aggregated state in a separate store/table.
If I understand correctly the problem is that event handler that is supposted to publish "summary event", is called before corresponding read model is regenerated and both listen on the same event.
If you will run publishing summary event asynchronously and regenerating read model synchronously, then you should avoid the problem. This will work just as indexes in database. When columns which are indexed are modified, then index (read model) is synchronously updated.
Thanks for your feedback @dgafka . We prefer to create our read models asynch and run the projections as long-running processes.
I added approach 3. We load the read model when a certain event has happened. It does not matter if the read model already has the current data. We take it as it is. But we use the current data from the event and simply overwrite it in the read model.
Then we publish an summary event.
A consumer takes the event and acts as a process manager. It sends commands that change the state on an aggregate that publishes the events to a segregated event layer. As suggested by @tPl0ch .
This seems to work far so fine.