eventuous
eventuous copied to clipboard
Subscriptions sometimes skips events
I have a funny problem with my setup that is running with the Postgresql EventStore (using npgsql 7) and MongoDB EventHandlers (using MongoDB.Entities) that are really simple - basically only upserting data without any weird data modification along the way.
However, sometimes when loading a bunch of data (fx from importing data from a legacy application) the checkpoint store has passed through a specific event but the EventHandler has never been run. If I then reset the checkpoint to a lower value and restart the application the EventHandlers run fine and everything is up to date.
Any ideas on how to debug this further is much appreciated - I am working on a PoC sample to see if it can be replicated outside our environment
Have you confirmed that the handler hasn't fired, like with logs or something?
Yes (sorta), I am logging every MongoDB call and the one to upsert the offending documents was never called
However, digging deeper into the logs I noticed my container was getting OOMKilled which might explain it, however that possibly raises another issue where the checkpoint is updated before the handlers have run?
No it can't be. The checkpoint commit is downstream from the projector. Are you sure you are doing SaveAsync
or ExecuteAsync
and it's not being delayed in any way by the library you use?
This is a part of the handler that doesn't always run:
public class RegistrationProjections : EventHandler
{
public RegistrationProjections()
{
On<V1.RegistrationReceived>(
async ctx =>
await new Registration
{
ID = ctx.Message.Id.ToString(),
[... Other properties ...]
}.SaveExceptAsync(x => new {x.Barcodes})
);
}
}
Barcodes are being updated in another event, which is why they are ignored here.
The handler is being registered like this:
services.AddSubscription<PostgresAllStreamSubscription, PostgresAllStreamSubscriptionOptions>(
"RegistrationProjections",
builder => builder.Configure(x => x.MaxPageSize = 256).AddEventHandler<RegistrationProjections>()
);
I tried lowering the MaxPageSize
to 256 see if it had any effect, but that doesn't seem to have done anything
What if you try using Eventuous MongoDB tools for that upsert? Or the MongoDB driver native API? I just want to remove the possibility that it's a third-party dependency causing the issue.
I understand, however that would require quite the refactor in my handlers.
I will try to add some more debug logging and try to dig deeper into it, and maybe create a simple handler using the Eventuous MongoDB tools to see if I can replicate it there.
I tried putting some more logging into the handler:
public class RegistrationProjections : EventHandler
{
public RegistrationProjections(ILogger<RegistrationProjections> logger)
{
On<V1.RegistrationReceived>(
async ctx =>
{
logger.LogDebug(
"Running projection for {EventType} with ID {RegistrationID}",
typeof(V1.RegistrationReceived).FullName,
ctx.Message.Id
);
await new Registration
{
ID = ctx.Message.Id.ToString(),
[... Other properties ...]
}.SaveExceptAsync(x => new {x.Barcodes});
}
);
}
}
Most of the events log it fine, but some are skipped. The event streams does exist in the database and this time there were no OOM kills of the container
Can you add global position to the logs? Also, how do you produce these migration events? Do you commit from multiple processes/threads, or is it a linear fetch-produce?
Here is an excerpt from the Eventuous debug logs - the Position
property is the one highlighted in blue:
- Dont mind that the event handler names etc are different from above - the above is merely a sample whereas the below is from real code
Note that positions 246197, 246201 & 246202 are missing even though they exist in the database:
(The ID's are off by 1 because of #163)
I have 1 service that discovers what data should be migrated which pushes a bunch of messages to another service using MassTransit that then uses the command service to apply the events and then a third service that generates the projections - the logs are from the third service. So the commits are multi-threaded from multiple containers (running the same code) but the projections are guaranteed to only be run by 1 instance at a time
I see that the global position appears out of order in the logs, do you partition the subscription?
For missing events, my suspicion is that it's the sequence issue. In case of frequent concurrent writes, in once service the sequence might get allocated before and commit later than in the other service. As the result, you can events with the lower sequence number committed after events with the higher sequence number. It results in the subscription receiving the events with higher number first and the next call will say "more than the one I have", and you have skipped events.
I know that only in theory as I am not an expert in Postgres, but I have read about it somewhere. Will try to find more about it, you can spend some time googling too...
Unexpected results might be obtained if a cache setting greater than one is used for a sequence object that will be used concurrently by multiple sessions. Each session will allocate and cache successive sequence values during one access to the sequence object and increase the sequence object's last_value accordingly. Then, the next cache-1 uses of nextval within that session simply return the preallocated values without touching the sequence object. So, any numbers allocated but not used within a session will be lost when that session ends, resulting in "holes" in the sequence.
Furthermore, although multiple sessions are guaranteed to allocate distinct sequence values, the values might be generated out of sequence when all the sessions are considered. For example, with a cache setting of 10, session A might reserve values 1..10 and return nextval=1, then session B might reserve values 11..20 and return nextval=11 before session A has generated nextval=2. Thus, with a cache setting of one it is safe to assume that nextval values are generated sequentially; with a cache setting greater than one you should only assume that the nextval values are all distinct, not that they are generated purely sequentially. Also, last_value will reflect the latest value reserved by any session, whether or not it has yet been returned by nextval.
Basically, what the docs say is to use cache setting of one to guarantee order in the sequence.
As we don't use sequences explicitly, I am wondering what are the sequence settings for the unique auto incremented id...
do you partition the subscription?
Partitioning is not enabled
--
A quick search around suggests that the cache values are only really a thing for a sequence, not for the identity. I will try searching deeper
Ok, looks like I am spamming here, but still.
Basically, what I am trying to say is that the issue is not that the subscription skips events. It's most probably caused by events with higher global position being committed to the database before events with lower values in global position. I am not exactly sure how to solve it.
I thought of the following:
- Don't use autogenerated id
- Query the max id value after the transaction is opened
- Assign incrementing values to global position when inserting
- Commit the transaction
- It might fail in case of optimistic concurrency, then retry
It will probably slow down the appends, but should solve the issue.
No worries about the spamming - It is actually a quite interesting problem to solve.
I digged around my postgres instance and found the following for the global position:
So it seems like the cache is already 1 for the autogenerated id.
I will try and dig deeper for a solution
Partitioning is not enabled
Ok, what about concurrency? I clearly see from your logs that events are being processed out of order, which should not happen if you have the default concurrency.
Also, auto-generated identity still uses a sequence. I think you can even find it, but I am not sure.
Concurrency isn't enabled either. The registration of the handler is referenced here: https://github.com/Eventuous/eventuous/issues/222#issuecomment-1570176340 - bog standard registration.
How do you conclude that they are processed out of order? The way I read the logs they start at 246192, processes up to 246196, "skips" 246197, processes up to 246200, "skips" 246201+246202, processes 246203 and then stores a new checkpoint.
The auto-generated identity sequence is exactly the one I (think) I found above - messages_global_position_seq
- the seq
-part being what makes me believe that.
Ah, it's the screenshot from Seq, right? So, it's sorted by time and new entries are on the top? Then indeed it looks fine.
Exactly :) Sorry that wasn't clearer before
A small update:
I tried adding some logging in PostgresSubscriptionBase
to output a list of GlobalPosition
s received back from the db call - and I think our suspicion is correct - some events are missing:
I am still working on the "why" part of it - since they are in the database when looking manually afterwards
I think it's what I wrote it is. Transactions competing for the sequence. The order of number allocation doesn't match with the commit order.
A small update from here:
I have tried a lot of various things, including trying to set a lock on the table when appending events (as it seems like that is the way MartenDB does it) - however that did not seem to do much of a difference.
It might be my pgbouncer that complicates things even further.
We have now decided to switch to EventStoreDB, and so far that seems like a much better choice. Our postgresql instance is also not getting slammed as much now.
I still think this issue should be kept open since it might be an issue for others
For anyone interested, the issue is well-known in frameworks that support Postgres as event store, such as Axon and Marten. They have a complex gap-detection process to resolve it. MessgeDB, on the other hand, locks the "category", but they avoid the concept of the global log (and do whatever it takes to defend this decision by throwing arguments).
I checked with Oskar, who maintains Marten, and he knows two solutions:
- Gap detection and tumbstoning gaps
- Checking against the minimum number of inflight transactions (ref https://event-driven.io/en/ordering_in_postgres_outbox/)
My proposal was a bit different. I thought in the append SP to do the following:
- Get
max(global_position)
- Use it with +1 to assign to events in the batch
-
global_position
should remain identity but not generated - Competing append committed in parallel would cause PK constraint violation
- Detect it and retry the whole thing
I have no idea what the performance will be though, but it will guarantee that appends happen with incrementing global position, and the subscription issue should vanish.
If anyone is interested in implementing any of the approaches, please feel free to do so.