castore
castore copied to clipboard
Query Models
Is your feature request related to a problem? Please describe. I'm working on a POC for my organization and am starting to try and introduce query models. I'm wondering if you have a discussion forum or anywhere that I could look to understand possible solutions.
Describe the solution you'd like I'd like to understand some good solutions for implementing query models with Castore's framework.
Describe alternatives you've considered Our organization is looking into other frameworks like AxonIQ. Personally I'd rather use your framework if possible.
Additional context So to be clear this isn't a feature request per say. I understand you're already working on this. I am asking if you have any forums or additional resources for potential solutions for query models patters that would work well with your framework. Thank you!
Hello @aiuhjv1
Read models are hard :) There's a lot to think about:
- How to implement events replays?
- Does your message bus keep events ordering?
- Can your message bus trigger the same projection several times?
We ran into this issue into past projects (using EventBridge, which does NOT keep events ordering and can project several times). Here's our recommandation:
- Use state-carrying messages (append aggregates to events messages after they are pushed)
- Completely derive a read model from one aggregate and one aggregate only (to not have to deal with events re-ordering across aggregates or event stores during replay)
- Tag your read model data with its aggregate last handled event version
- Completely override the read model at each event, provided the currently handled event version is above the last known handled event version (most DB APIs have this kind of transaction feature)
This makes your architecture resilient to events being projected in the wrong order or several times, while making your replay extremely simple: Simply list the aggregate ids in your event store and re-dispatch their last event in your message bus.
The cost is that it's not possible to pre-compute data that combines events from several aggregates or several event stores (like the sum of all the accounts of a user). You need to cross this data at read time rather than at write time.
I'll be writing some documentation about that soon :)
Hi @ThomasAribart, I am a fan of what you are creating here.
In my situation, which I believe is a common one, I intend to create an SQL-based read model. This choice is driven by the ease of connecting a frontend to an SQL-based data source, given the abundance of out-of-the-box solutions available for this use case.
To achieve this goal, my projection will consume events from EventBridge to detect changes in an aggregate. However, due to the challenge of handling out-of-order events, the projection will utilize the event only as a signal that a change occurred in the aggregate. The actual data for the updated aggregate will be directly read from the EventStore (DynamoDB).
This approach also simplifies the process of rebuilding the projection. In case of a rebuild, the projection will drop the current model and read all events from the EventStore anew.
What are your thoughts about it?
@nirweiner2 ,
Disclaimer: I'm not using castore (yet).
On my current project, we exactly encounter the same issues with event bridge and replay/ordering issues. What I was thinking of implementing was something like you described,
The read model will get a notification for a specific aggregate and use its internal state to decide if it needs to be updated. And if you need to replay you need to drop the state of the readmodel, (that could be aggregateId + version) and send a notification again.
How did it go for you guys?
@jeremycare We have not implemented Event Sourcing yet. I am a big fan of the pattern though.
As far as I understand from your presented architecture, it actually looks like you use FIFOs instead of Eventbridge. You could greatly reduce the complexity by using EventBrdige. Am I missing anything?
As for the replay concept. Unfortunately, I couldn't find good resources on anyone using similar approach. That being said, after a lot of reading I do believe this would be the best option.
Just to be clear, after clearing the read model, you'll have to iterate over all of the aggregates and send a notification for each one separately. This way you can leverage the concurrency of serverless lambda to build the read model faster and efficient.
Hey @nirweiner2
Here's the architecture we went for on my project. We were resilient to event re-play/bad ordering so we didn't need to use FIFO queues.
Command executions + fan out of state carrying events to Event Bridge:
From event bridge, we distinguished side effects
from projections
on read models. Read models are always derived from only one aggregate (we do not refetch other aggregates, if data needs to be crossed between event stores, we cross projected read models at read time instead), and they always push the entire read model along with its version, but with a condition: Either the read model doesn't exist, or it exists but have a version less than or equal to the version we're currently projecting.
This makes events replay very easy: Simply re-send a state-carrying message of the last event of each aggregate into the event bus, but with a event-bridge detail-type "__REPLAYED__"
not to re-trigger side effects. We split it into two parts to avoid time outs: First list aggregate ids and then replay each aggregate.
We do not need to replay events in the right order (actually only the last event is needed), neither do we need to replay the aggregates in the right order (projections do not fetch other data than the one we send), so no need for FIFO queues at all. And we're also idempotent so resilient to events being projected several times.
Hope it helped!
@ThomasAribart Thank you very much! This is very helpful.
I am wondering why you have to use the state carrying messages at all. The state can always be pulled by building the aggregate from the eventstore instead. I think it could reduce the complication.
As for the replay, I suggest a similar mechanism. listAggregateIds -> foreach aggregate, push AggregateReplayEvent to eventbridge.
@nirweiner2 Yes you could use only notification messages (i.e. without aggregate), and re-fetch the aggregate in the projections/side-effects later on.
However, it doesn't scale very well: You'll need to refetch them at least n
times, n
being the number of projections that are triggered. And probably more as the aggregate is often needed in side-effects as well. This leads to increased traffic and costs.
@ThomasAribart, do your Query models build themselves based on the fine-grained events of the aggregate? Or do they project the aggregate?
Today our query models are building on the fine-grained events. but its a heavy effort to maintain that. I see in the diagram you previously sent that you mention using the aggregate directly.
Indeed, our read models are built from the aggregate. Events are mostly used in side effects !
@ThomasAribart ,
Did you guys face the same latency issues we're encountering with the architecture?
Since we rely on a lot of services like SQS / EB etc, we are having up to 5 sec of delays between a mutation and a query update. This is too slow for us, we're considering removing all those managed queues and having an AmazonMQ instead.
Any thoughts?
Hi @jeremycare !
I see a few improvements here:
- You don't need the
fanout
block AND theEventBridgePipe
together. TheEventBridgePipe
can come in replacement of thefanout
block, as it can listen to DynamoDB streams, trigger an enrichment lambda (that can unmarshall the message + append the aggregate if you useStateCarryingEvents
) and directly stream its response to EventBridge. You can win a few seconds by removing thefanout
block entirely I think. - If that doesn't cut it for you, you can also use
ConnectedEventStore
class to directly send the message toEventBridge
just after writing the event to DynamoDB in the command. This avoids need anyEventBridgePipe
altogether (plus a refetch of the aggregate if you useStateCarryingEvents
, but don't forget to provide theprevAggregate
as an option to thepushEvent
method to avoid having to refetch the aggregate). - I don't think you need a second SQS in the read model part as well. Triggering the lambda by event bridge should work just fine. And probably the second
fanout
lambda should be calledprojectOnReadModel
🤔 ?
Hi @ThomasAribart,
If I resume the first option, it looks more like this:
The second option looks like this:
I don't think you need a second SQS in the read model part as well. Triggering the lambda by event bridge should work just fine.
EventBridge only retries on delivery errors, if the lambda starts and there is an error in the lambda, for example, OS is down, or anything else. Eventbridge will not retry.
Do you handle this error handling differently?
@jeremycare Yes exactly !
About EventBridge, Event Bridge does not do any retry for errors happening within the Lambda code. However, it is an asynchronous invocation and you will benefit from the 2 internal Lambda retries. You can also add a dead letter queue for unsuccessful execution after that.
See: https://docs.aws.amazon.com/lambda/latest/dg/invocation-retries.html
Apologies for hjiacking this discussion but it's on a similar topic.
@ThomasAribart have you got a neat solution to handling at least once delivery with EventBridge and your side effects?
We're looking into building an implementation with castore at the moment, but unsure how to handle this nicely. The specific scenario being we want a Lambda to listen to an event and off the back of it trigger a command that publishes another event. If EventBridge delivers the event twice, we don't want the triggered event written/published twice.
We're currently thinking FIFO Queue, or writing already delivered events to dynamo but neither sound great.
Hi @matt-kinton !
I see two possibilities:
- Either the commands down the line are non-repeatable (e.g. deletion of a resource): You'll have to implement some validation in the command anyway, so you can simply let it throw if EventBridge delivers the event twice (which should be fairly rare).
- Either the commands are repeatable (e.g. adding money to an account).This case is annoying: I would simply push some kind of "lock" in DynamoDB before running the command. A few things to be careful about:
- You should not do the lock transaction in two steps (try and get the lock and THEN push it if none exists): Instead, simply try to push the lock, but with a DynamoDB condition on its non-existence. This makes sure that two concurrent executions will be exclusive (otherwise you would have a race condition issue).
- The lock is only useful for a short amount of time. You can set a time-to-live (native DynamoDB feature) to delete it after a while (like 1 day should be enough).
- Don't forget to delete it if the command fails
In any case, running commands in response to events in the same event store is overall not a good practice: Probably the serie of events you would get can be merged into a single event.
I will write down this in the documentation soon, but a "good" event is:
- Significant in terms of business (avoid noise like analytics events, or intermediary form state saving)
- Triggered by a user interaction or an external technical trigger (webhook etc.)
The only valid cases for internal technical commands would be:
- Microservices communication (arguably it falls under the "external technical triggers" above)
- One-to-many ricochets (like deleting a user would trigger n commands for deleting his/her n accounts)
Cheers @ThomasAribart that's a great answer!
The lock is what we were considering, so good to know we're on the right path. It's just a case of educating the team of at least once delivery I think!
Definitely makes sense to not run commands from events in the same event store. We fall into the camp of 'external technical triggers', so receiving event's from other teams microservices.
Looking forward to seeing more docs soon, we're working on a PoC at the moment and enjoying the lib so far 😄