eventsourcing icon indicating copy to clipboard operation
eventsourcing copied to clipboard

Automatic snapshotting not work for functional style aggregate

Open filwaline opened this issue 1 year ago • 8 comments

Version: 9.2.22

# https://eventsourcing.readthedocs.io/en/latest/topics/examples/aggregate7.html#domain-model
def register_dog(name: str) -> DomainEvent:
    return DogRegistered(
        originator_id=uuid4(),
        originator_version=1,
        timestamp=create_timestamp(),
        name=name,
    )


def add_trick(dog: Dog, trick: Trick) -> DomainEvent:
    return TrickAdded(
        originator_id=dog.id,
        originator_version=dog.version + 1,
        timestamp=create_timestamp(),
        trick=trick,
    )
# https://github.com/pyeventsourcing/eventsourcing/blob/v9.2.22/eventsourcing/application.py#L566-L585
    def collect_events(
        self,
        *objs: Optional[Union[MutableOrImmutableAggregate, DomainEventProtocol]],
        **kwargs: Any,
    ) -> None:
        """
        Collects pending domain events from the given aggregate.
        """
        for obj in objs:
            if obj is None:
                continue
            elif isinstance(obj, DomainEventProtocol):
                self.events.append(obj)
            else:
                if isinstance(obj, CollectEventsProtocol):
                    for event in obj.collect_events():
                        self.events.append(event)
                self.aggregates[obj.id] = obj

        self.saved_kwargs.update(kwargs)

# https://github.com/pyeventsourcing/eventsourcing/blob/v9.2.22/eventsourcing/application.py#L846-L849
    def _take_snapshots(self, processing_event: ProcessingEvent) -> None:
        # Take snapshots using IDs and types.
        if self.snapshots and self.snapshotting_intervals:
            for event in processing_event.events:
                try:
                    aggregate = processing_event.aggregates[event.originator_id]
                except KeyError:
                    continue
...

There functions only return DomainEvent, but automatic snapshotting require a aggregate object to going on. Am i suppose to build an aggregate and pass it with events, to make automatic snapshotting work?

filwaline avatar Feb 07 '24 10:02 filwaline

Good question!

Firstly, let's consider the method _take_snapshots(). It looks for a "snapshotting interval", using the type of the collected aggregate. If there is an interval, and if the version number is exactly a multiple of the interval, then it calls take_snapshot(). The take_snapshot() method just needs an aggregate ID and a version number. But if only the events are passed to save() there aren't any aggregates so there no intervals are found and there's no automatic snapshotting.

So I suppose there are three ways to go. You can either (1) continue to save just the events and write your own _take_snapshots() method, or (2) adjust your aggregate so it has a collect_events() method and adjust your command methods so they return a new instance of the aggregate with the new events held somewhere so that they can be returned by the collect_events() method, or (3) adjust your command methods so that rather than just saving the new event they run the new event through the aggregate projector function and then call save with both the aggregate and the new events.

Option 1. Continue to save() just the events. And then write your own _take_snapshots() method that has some other kind of conditional logic for deciding whether to call take_snapshot(). For example, you could directly or indirectly identify the interval from the topic of the event. This is slightly messy because you will need to find the snapshotting interval from the aggregate event without having the aggregate.

Option 2. Change the "immutable aggregate" so that it has collect_events() method, and change the command methods so that they call the mutator function with the new event to construct a new instance of the aggregate, and then somehow inject the new event (for example see aggregate example 4 and look at the way pending events are held in a class attribute), and then return the aggregate with the new event held internally so that collect_events() can return it (like in aggregate example 4). And then save() the aggregate rather than the event. This is slightly messy because you will need to keep the pending events in a class attribute (like aggregate example 4) or perhaps a global dict if that works better with Pydantic.

Option 3. This is actually coded in the examples but isn't presented in the docs. See https://github.com/pyeventsourcing/eventsourcing/blob/9.3/eventsourcing/examples/aggregate7/test_snapshotting_intervals.py

johnbywater avatar Feb 07 '24 14:02 johnbywater

@filwaline I just added aggregate examples 6a and 7a which do what I suggested in option 2 (immutable aggregate style with frozen Python dataclass, and with frozen Pydantic model):

https://github.com/pyeventsourcing/eventsourcing/tree/9.3/eventsourcing/examples/aggregate6a https://github.com/pyeventsourcing/eventsourcing/tree/9.3/eventsourcing/examples/aggregate7a

Please note, because otherwise the snapshotting method needs it, you need to specify the aggregate projector function on the application class (see the example application code for details).

Let me know what you think about this? and if you get it working?

johnbywater avatar Feb 07 '24 15:02 johnbywater

Thanks! Very detailed explanation and useful examples!

Option 3 is very good, it remind functional style and esay to implement.

Option 2 is a little weird to me, it need a global dict and not very functional...

Option 1 need some hacky works, but it keep evething else as same, it still a good choice.

I am going to give option 1&3 a try, and choose one of them.

Thanks again!

filwaline avatar Feb 07 '24 16:02 filwaline

Hi @johnbywater, I'm working on option 1, which is creating my own _take_snapshots function. I noticed that original one requires an aggregate object solely to obtain its type. Therefore, I'm considering adding a new class variable, perhaps named snapshotting_event_aggregate_mapping, which could serve the purpose of determining the aggregate type. What are your thoughts on this idea?

Class Application:
    snapshotting_event_aggregate_mapping: ClassVar[
        Dict[Type[DomainEventProtocol], Type[MutableOrImmutableAggregate]]
    ] = None

    def _take_snapshots(self, processing_event: ProcessingEvent) -> None:

        def _get_aggregate_type(
            event: DomainEventProtocol,
        ) -> Type[MutableOrImmutableAggregate] | None:
            aggregate_type = None
            aggregate = processing_event.aggregates.get(event.originator_id)
            if aggregate:
                aggregate_type = type(aggregate)
            elif self.snapshotting_event_aggregate_mapping:
                aggregate_type = self.snapshotting_event_aggregate_mapping.get(
                    type(event)
                )
            return aggregate_type

        # Take snapshots using IDs and types.
        if self.snapshots and self.snapshotting_intervals:
            for event in processing_event.events:
                aggregate_type = _get_aggregate_type(event)
                interval = self.snapshotting_intervals.get(aggregate_type)
                if interval is not None and event.originator_version % interval == 0:
                    if (
                        self.snapshotting_projectors
                        and aggregate_type in self.snapshotting_projectors
                    ):
                        projector_func = self.snapshotting_projectors[aggregate_type]
                    else:
                        projector_func = project_aggregate
                    if projector_func is project_aggregate and not isinstance(
                        event, CanMutateProtocol
                    ):
                        msg = (
                            f"Cannot take snapshot for {aggregate_type} with "
                            "default project_aggregate() function, because its "
                            f"domain event {type(event)} does not implement "
                            "the 'can mutate' protocol (see CanMutateProtocol)."
                            f" Please define application class {type(self)}"
                            " with class variable 'snapshotting_projectors', "
                            f"to be a dict that has {aggregate_type} as a key "
                            "with the aggregate projector function for "
                            f"{aggregate_type} as the value for that key."
                        )
                        raise ProgrammingError(msg)
                    self.take_snapshot(
                        aggregate_id=event.originator_id,
                        version=event.originator_version,
                        projector_func=projector_func,
                    )

filwaline avatar Feb 08 '24 09:02 filwaline

One more thing, if _take_snapshots fails to retrieve the aggregate type, an expection should be raised instead of passing silently. So users won't confuse why automatic snapshotting not working.

filwaline avatar Feb 08 '24 09:02 filwaline

What you did is nice. Yes, it's just getting the aggregate type so it can get the interval. Sometimes the event classes are nested in the aggregate class, so the event topic can be trimmed to get the aggregate topic, and then we can get the class from that topic. But in the "functional" style people seem to like defining the event classes at the module level, so this wouldn't work. So in this case there needs to be an explicit mapping from the event types to their aggregate types, just like you have it above.

One more thing, if _take_snapshots fails to retrieve the aggregate type, an expection should be raised instead of passing silently. So users won't confuse why automatic snapshotting not working.

The reason it "fails silently" is that not all users want all aggregates to be snapshotted. The ones that are to be snapshotted are included in the configuration, and others not?

johnbywater avatar Feb 08 '24 13:02 johnbywater

The reason it "fails silently" is that not all users want all aggregates to be snapshotted. The ones that are to be snapshotted are included in the configuration, and others not?

if user enable snapshot store and provide snapshotting_interval, but _take_snapshots fails to retrieve the aggregate type from aggregate obj or domain event, this should consider a programming error. An aggregate be snapshotted or not, still depends on snapshotting_interval configuration.

...
        # Take snapshots using IDs and types.
        if self.snapshots and self.snapshotting_intervals:
            for event in processing_event.events:
                aggregate_type = _get_aggregate_type(event)
                if aggregate_type is None:
                    raise ProgrammingError(
                        f"Failed to get aggregate type for {type(event)}."
                    )
                interval = self.snapshotting_intervals.get(aggregate_type)
                if interval is not None and event.originator_version % interval == 0:
...

filwaline avatar Feb 08 '24 13:02 filwaline

Oh yes! I see what you mean now.

johnbywater avatar Feb 08 '24 15:02 johnbywater