created_on and modified_on attributes of Aggregates reconstructed from snapshots have inconsistent types
While trying to enable snapshotting, I found behavior that might not be consistent with what's written in the documentation. As a simple example, I have slightly modified the test code from example aggregate 8 to say
# Construct application object.
school = DogSchool()
# Evolve application state.
dog_id = school.register_dog("Fido")
school.add_trick(dog_id, "roll over")
school.add_trick(dog_id, "play dead")
# Query application state.
dog = school.get_dog(dog_id)
print("type(dog['created_on']): ", type(dog['created_on']))
print("type(dog['modified_on']): ", type(dog['modified_on']))
# Take snapshot.
school.take_snapshot(dog_id, version=3)
dog = school.get_dog(dog_id)
print("type(dog['created_on']): ", type(dog['created_on']))
print("type(dog['modified_on']): ", type(dog['modified_on']))
# Continue with snapshotted aggregate.
school.add_trick(dog_id, "fetch ball")
dog = school.get_dog(dog_id)
print("type(dog['created_on']): ", type(dog['created_on']))
print("type(dog['modified_on']): ", type(dog['modified_on']))
and gotten the output
type(dog['created_on']): <class 'datetime.datetime'>
type(dog['modified_on']): <class 'datetime.datetime'>
type(dog['created_on']): <class 'str'>
type(dog['modified_on']): <class 'str'>
type(dog['created_on']): <class 'str'>
type(dog['modified_on']): <class 'datetime.datetime'>
The documents mention that these attributes should be datetimes.
From using a debugger, I found that those 2 attributes start off as datetimes, but during reconstruction from the repository, they become strings when mutated by a Snapshot's mutate method:
aggregate.__dict__.update(aggregate_state)
However, if a domain event occurs after the snapshot was taken, its mutate method
aggregate.modified_on = self.timestamp
restores the modified_on to being a datetime, which explains why the third case in my output has two different types.
To summarize,
- when an Aggregate is created,
created_onandmodified_onaredatetimes - when gotten from a repository (hence reconstructed), if a snapshot was taken,
created_onandmodified_onbecome strings - if a domain event happens to occur after the last snapshot,
modified_onbecomes adatetimeagain
Is this intended? And should we be able to rely on the type being datetime?
The aggregate's created_on and modified_on are actually properties that read the value from _created_on and _modified_on, respectively. In example "Aggregate 8", the declarative Pydantic example, the OrJsonTranscoder used encode to convert the datetime object to a string for these fields. It also uses decode to convert the stored event bytes value to a string for these fields. Normally, there are a bunch of transcoders that convert the string to ISO, etc., but in this example it raises an error because the expectation is that these fields will be defined on the Pydantic model for the events.
The first thing I tried was to add _created_on and _modified_on fields to the SnapshotState model. However, Pydantic uses fields prefixed with _ as private model attributes and these don't get serialized. You can almost get around this using aliases:
class SnapshotState(BaseModel):
"""Pydantic-based class for storing an aggregate's snapshot state."""
# add leading underscores to avoid conflicting with `created_on` and `modified_on` Aggregate parameters
created_on_: datetime = Field(alias="_created_on")
modified_on_: datetime = Field(alias="_modified_on")
model_config = ConfigDict(extra="allow", populate_by_name=True)
To serialize the fields of a Pydantic model with the alias values you need to add by_alias=True to model_dump, so the [PydanticMapper to_stored_event method needs to be updated to be:
def to_stored_event(self, domain_event: DomainEventProtocol) -> StoredEvent:
topic = get_topic(domain_event.__class__)
event_state = cast(BaseModel, domain_event).model_dump(by_alias=True) # <- Change here
stored_state = self.transcoder.encode(event_state)
if self.compressor:
stored_state = self.compressor.compress(stored_state)
if self.cipher:
stored_state = self.cipher.encrypt(stored_state)
return StoredEvent(
originator_id=domain_event.originator_id,
originator_version=domain_event.originator_version,
topic=topic,
The mutate method of AggregateSnapshot also needs to be overridden to include the by_alias=True line:
def mutate(self, _: None) -> Aggregate:
"""
Reconstructs the snapshotted :class:`Aggregate` object.
"""
cls = cast(Type[Aggregate], resolve_topic(self.topic))
# aggregate_state = dict(aggregate.__dict__)
aggregate_state = self.state.model_dump(by_alias=True) # <- Change here
from_version = aggregate_state.pop("class_version", 1)
class_version = getattr(cls, "class_version", 1)
while from_version < class_version:
upcast_name = f"upcast_v{from_version}_v{from_version + 1}"
upcast = getattr(cls, upcast_name)
upcast(aggregate_state)
from_version += 1
aggregate_state["_id"] = self.originator_id
aggregate_state["_version"] = self.originator_version
aggregate_state["_pending_events"] = []
aggregate = object.__new__(cls)
aggregate.__dict__.update(aggregate_state)
return aggregate
If you don't do this, _created_on and _modified_on attributes don't get set in the aggregate (they get set as created_on_ and modified_on_ as defined in the model) and you get an AttributeError. However, dumping the model has an side effect. Any nested Pydantic BaseModel subclasses, like Trick, also get dumped to a dictionary. Since the aggregate instance is not a BaseModel subclass, it's very difficult to rehydrate these BaseModel instances in a programatic way. If it were, we could do aggregate(**aggregate_state) and we'd be done.
A few things here:
- Why are
Aggregate.created_onandAggregate.modified_on(andAggregate.version,Aggregate.id, etc.) properties? It seems like they only return their underscore counterparts without modification. If they were regular attributes this becomes much easier to do; there would be no need formodel_dumpor Pydantic aliases. - Is there a (relatively) easy way to subclass both
pydantic.BaseModelandeventsourcing.Aggregate? I tried this briefly but got a metaclass error. - Is the best course of action to treate
_modified_onand_created_onas special, similar to_id,_version, and_pending_eventsin themutatemethod? - Are there other hooks here that I may have missed that would simplify snapshotting with Pydantic models?
Thanks!
Thanks for your very detailed and interesting analysis @matthewchao and @d-m. I'm looking into this now....
After playing around with this, it seems one simple solution to this problem is to use a Pydantic TypeAdaptor to "validate" the underscore-prefixed constructor arguments of SnapshotState.
datetime_adapter = TypeAdapter(datetime)
class SnapshotState(BaseModel):
model_config = ConfigDict(extra="allow")
def __init__(self, **kwargs: Any) -> None:
for key in ["_created_on", "_modified_on"]:
kwargs[key] = datetime_adapter.validate_python(kwargs[key])
super().__init__(**kwargs)
It would be nice to express this with Pydantic's declarative syntax, but it seems it just isn't possible because of Pydantic's hardline treatment of underscore prefixed fields as excluded from the validated model schema.
I realise the underscore prefixed attributes don't play well with Pydantic's declarative syntax. So I'm also looking into changing the Aggregate class to have created_on and modified_on attributes that aren't properties.
However, I'm reluctant to change this for reasons of stability in the API and backwards compatibility. The current use of properties was partly historical and partly motivated by the MutableAggregateProtocol and ImmutableAggregateProtocol and MutableOrImmutableAggregate aspects of the typing.
But I am thinking about it, because it would be nice to have this fit better with Pydantic. Also I'm wondering if perhaps the common persistence and domain model code which the Pydantic/Orjson examples use could usefully be published as a separate package. If so, then perhaps including the above code would satisfactorily encapsulate a resolution of the issue.
Thanks again for all the detailed considerations above. I really appreciated your attention to detail.
Thank you, we have it written as a separate package internally. I'll see if I can get permission to open source that piece, although these requests are not always so easy in a corporate environment...
This has now been fixed in the stable docs. I used the code I posted above to convert strings to datetime objects, and enhanced the test to check the attributes have the correct type.
https://eventsourcing.readthedocs.io/en/stable/topics/examples/aggregate8.html