MultipleAggregateReadStoreManager and order of events
My aggregate have received 2 commands almost at the same time. Seems due optimistic concurrency conflicts now I have events with messed timestamps. For example:
- event with sequence number 24 and timestamp '2020-11-25T14:23:30.3053878'
- event with sequence number 25 and timestamp '2020-11-25T14:23:30.1890190'
I think messed timestamps is not big problem, but order of events is important. MultipleAggregateReadStoreManager sorts events by timestamp and applies them to readmodel in wrong order. Perhaps there are required some more complex sorting that will keep order of same aggregate events by sequence number.
Looks like MultipleAggregateReadStoreManager:63 causes this.
However, the fix to this isn't as simple as just switching the order of the sort.
The MultipleAggregateReadStore is used when a ReadModel is using a ReadModelLocator. Which is generally done to apply events multiple Aggregates to one ReadModel.
As a result of this, we can't just sort by AggregateSequenceNumber first since this would cause cross-aggregate events to apply incorrectly.
To me, ensuring the constraint that an event with a sequence number less than another event indicates that event was emitted before or at the same time as the event, is probably the more straightforward way of solving this issue.
What Event Persistence implementation are you using? I can't seem to reliably trigger an optimistic concurrency conflict that resolves in a way where the timestamps / aggregate sequence numbers are misaligned in this way.
What Event Persistence implementation are you using?
MsSqlEventPersistence. We are using version 0.80.4377.
For now I'm experimenting with such workaround:
protected override IReadOnlyCollection<ReadModelUpdate> BuildReadModelUpdates(IReadOnlyCollection<IDomainEvent> domainEvents)
{
var readModelUpdates = new List<ReadModelUpdate>();
foreach (var readModelEvents in domainEvents
.SelectMany(
e => readModelLocator.GetReadModelIds(e),
(e, rid) => (DomainEvent: e, ReadModelId: rid))
.GroupBy(t => t.ReadModelId, t => t.DomainEvent))
{
var events = readModelEvents.OrderBy(d => d.Timestamp).ThenBy(d => d.AggregateSequenceNumber).ToList();
// Split events by aggregates and sort every group independently by sequence number.
var aggregateGroups = events
.Select((e, i) => (Event: e, Index: i))
.GroupBy(x => (x.Event.AggregateType, x.Event.GetIdentity().Value))
.Select(g => g.ToList())
.ToList();
foreach (var item in aggregateGroups.SelectMany(g => g
.Select(x => x.Index)
.Zip(
g.Select(x => x.Event).OrderBy(e => e.AggregateSequenceNumber),
(i, e) => (OriginalIndex: i, SortedEvent: e))))
{
events[item.OriginalIndex] = item.SortedEvent;
}
readModelUpdates.Add(new ReadModelUpdate(readModelEvents.Key, events));
}
return readModelUpdates;
}
It's not tested enough, but seems working well with my problem cases.
@Kladzey would you be able to create a test that triggers the error as an PR?
Hello there!
We hope you are doing well. We noticed that this issue has not seen any activity in the past 90 days. We consider this issue to be stale and will be closing it within the next seven days.
If you still require assistance with this issue, please feel free to reopen it or create a new issue.
Thank you for your understanding and cooperation.
Best regards, EventFlow
Hello there!
We hope you are doing well. We noticed that this issue has not seen any activity in the past 90 days. We consider this issue to be stale and will be closing it within the next seven days.
If you still require assistance with this issue, please feel free to reopen it or create a new issue.
Thank you for your understanding and cooperation.
Best regards, EventFlow
Hello there!
This issue has been closed due to inactivity for seven days. If you believe this issue still needs attention, please feel free to open a new issue or comment on this one to request its reopening.
Thank you for your contribution to this repository.
Best regards, EventFlow