eventsourcing
eventsourcing copied to clipboard
Automatic snapshotting not work for functional style aggregate
Version: 9.2.22
# https://eventsourcing.readthedocs.io/en/latest/topics/examples/aggregate7.html#domain-model
def register_dog(name: str) -> DomainEvent:
return DogRegistered(
originator_id=uuid4(),
originator_version=1,
timestamp=create_timestamp(),
name=name,
)
def add_trick(dog: Dog, trick: Trick) -> DomainEvent:
return TrickAdded(
originator_id=dog.id,
originator_version=dog.version + 1,
timestamp=create_timestamp(),
trick=trick,
)
# https://github.com/pyeventsourcing/eventsourcing/blob/v9.2.22/eventsourcing/application.py#L566-L585
def collect_events(
self,
*objs: Optional[Union[MutableOrImmutableAggregate, DomainEventProtocol]],
**kwargs: Any,
) -> None:
"""
Collects pending domain events from the given aggregate.
"""
for obj in objs:
if obj is None:
continue
elif isinstance(obj, DomainEventProtocol):
self.events.append(obj)
else:
if isinstance(obj, CollectEventsProtocol):
for event in obj.collect_events():
self.events.append(event)
self.aggregates[obj.id] = obj
self.saved_kwargs.update(kwargs)
# https://github.com/pyeventsourcing/eventsourcing/blob/v9.2.22/eventsourcing/application.py#L846-L849
def _take_snapshots(self, processing_event: ProcessingEvent) -> None:
# Take snapshots using IDs and types.
if self.snapshots and self.snapshotting_intervals:
for event in processing_event.events:
try:
aggregate = processing_event.aggregates[event.originator_id]
except KeyError:
continue
...
There functions only return DomainEvent, but automatic snapshotting require a aggregate object to going on. Am i suppose to build an aggregate and pass it with events, to make automatic snapshotting work?
Good question!
Firstly, let's consider the method _take_snapshots()
. It looks for a "snapshotting interval", using the type of the collected aggregate. If there is an interval, and if the version number is exactly a multiple of the interval, then it calls take_snapshot()
. The take_snapshot()
method just needs an aggregate ID and a version number. But if only the events are passed to save()
there aren't any aggregates so there no intervals are found and there's no automatic snapshotting.
So I suppose there are three ways to go. You can either (1) continue to save just the events and write your own _take_snapshots()
method, or (2) adjust your aggregate so it has a collect_events()
method and adjust your command methods so they return a new instance of the aggregate with the new events held somewhere so that they can be returned by the collect_events()
method, or (3) adjust your command methods so that rather than just saving the new event they run the new event through the aggregate projector function and then call save with both the aggregate and the new events.
Option 1. Continue to save()
just the events. And then write your own _take_snapshots()
method that has some other kind of conditional logic for deciding whether to call take_snapshot()
. For example, you could directly or indirectly identify the interval from the topic of the event. This is slightly messy because you will need to find the snapshotting interval from the aggregate event without having the aggregate.
Option 2. Change the "immutable aggregate" so that it has collect_events()
method, and change the command methods so that they call the mutator function with the new event to construct a new instance of the aggregate, and then somehow inject the new event (for example see aggregate example 4 and look at the way pending events are held in a class attribute), and then return the aggregate with the new event held internally so that collect_events()
can return it (like in aggregate example 4). And then save()
the aggregate rather than the event. This is slightly messy because you will need to keep the pending events in a class attribute (like aggregate example 4) or perhaps a global dict if that works better with Pydantic.
Option 3. This is actually coded in the examples but isn't presented in the docs. See https://github.com/pyeventsourcing/eventsourcing/blob/9.3/eventsourcing/examples/aggregate7/test_snapshotting_intervals.py
@filwaline I just added aggregate examples 6a and 7a which do what I suggested in option 2 (immutable aggregate style with frozen Python dataclass, and with frozen Pydantic model):
https://github.com/pyeventsourcing/eventsourcing/tree/9.3/eventsourcing/examples/aggregate6a https://github.com/pyeventsourcing/eventsourcing/tree/9.3/eventsourcing/examples/aggregate7a
Please note, because otherwise the snapshotting method needs it, you need to specify the aggregate projector function on the application class (see the example application code for details).
Let me know what you think about this? and if you get it working?
Thanks! Very detailed explanation and useful examples!
Option 3 is very good, it remind functional style and esay to implement.
Option 2 is a little weird to me, it need a global dict and not very functional...
Option 1 need some hacky works, but it keep evething else as same, it still a good choice.
I am going to give option 1&3 a try, and choose one of them.
Thanks again!
Hi @johnbywater, I'm working on option 1, which is creating my own _take_snapshots
function. I noticed that original one requires an aggregate object solely to obtain its type. Therefore, I'm considering adding a new class variable, perhaps named snapshotting_event_aggregate_mapping
, which could serve the purpose of determining the aggregate type. What are your thoughts on this idea?
Class Application:
snapshotting_event_aggregate_mapping: ClassVar[
Dict[Type[DomainEventProtocol], Type[MutableOrImmutableAggregate]]
] = None
def _take_snapshots(self, processing_event: ProcessingEvent) -> None:
def _get_aggregate_type(
event: DomainEventProtocol,
) -> Type[MutableOrImmutableAggregate] | None:
aggregate_type = None
aggregate = processing_event.aggregates.get(event.originator_id)
if aggregate:
aggregate_type = type(aggregate)
elif self.snapshotting_event_aggregate_mapping:
aggregate_type = self.snapshotting_event_aggregate_mapping.get(
type(event)
)
return aggregate_type
# Take snapshots using IDs and types.
if self.snapshots and self.snapshotting_intervals:
for event in processing_event.events:
aggregate_type = _get_aggregate_type(event)
interval = self.snapshotting_intervals.get(aggregate_type)
if interval is not None and event.originator_version % interval == 0:
if (
self.snapshotting_projectors
and aggregate_type in self.snapshotting_projectors
):
projector_func = self.snapshotting_projectors[aggregate_type]
else:
projector_func = project_aggregate
if projector_func is project_aggregate and not isinstance(
event, CanMutateProtocol
):
msg = (
f"Cannot take snapshot for {aggregate_type} with "
"default project_aggregate() function, because its "
f"domain event {type(event)} does not implement "
"the 'can mutate' protocol (see CanMutateProtocol)."
f" Please define application class {type(self)}"
" with class variable 'snapshotting_projectors', "
f"to be a dict that has {aggregate_type} as a key "
"with the aggregate projector function for "
f"{aggregate_type} as the value for that key."
)
raise ProgrammingError(msg)
self.take_snapshot(
aggregate_id=event.originator_id,
version=event.originator_version,
projector_func=projector_func,
)
One more thing, if _take_snapshots
fails to retrieve the aggregate type, an expection should be raised instead of passing silently. So users won't confuse why automatic snapshotting not working.
What you did is nice. Yes, it's just getting the aggregate type so it can get the interval. Sometimes the event classes are nested in the aggregate class, so the event topic can be trimmed to get the aggregate topic, and then we can get the class from that topic. But in the "functional" style people seem to like defining the event classes at the module level, so this wouldn't work. So in this case there needs to be an explicit mapping from the event types to their aggregate types, just like you have it above.
One more thing, if _take_snapshots fails to retrieve the aggregate type, an expection should be raised instead of passing silently. So users won't confuse why automatic snapshotting not working.
The reason it "fails silently" is that not all users want all aggregates to be snapshotted. The ones that are to be snapshotted are included in the configuration, and others not?
The reason it "fails silently" is that not all users want all aggregates to be snapshotted. The ones that are to be snapshotted are included in the configuration, and others not?
if user enable snapshot store and provide snapshotting_interval, but _take_snapshots fails to retrieve the aggregate type from aggregate obj or domain event, this should consider a programming error. An aggregate be snapshotted or not, still depends on snapshotting_interval configuration.
...
# Take snapshots using IDs and types.
if self.snapshots and self.snapshotting_intervals:
for event in processing_event.events:
aggregate_type = _get_aggregate_type(event)
if aggregate_type is None:
raise ProgrammingError(
f"Failed to get aggregate type for {type(event)}."
)
interval = self.snapshotting_intervals.get(aggregate_type)
if interval is not None and event.originator_version % interval == 0:
...
Oh yes! I see what you mean now.