eventuous icon indicating copy to clipboard operation
eventuous copied to clipboard

Subscription fails to correctly store its Checkpoint sometimes, usually shortly after application startup

Open ReinderReinders opened this issue 2 years ago • 45 comments

Context: I have a Postgres DB in the cloud that serves as the Event Store. A MicroService (also in the cloud) is subscribed to the Event Store and synchronizes Entity states (I have just Create, Update, Delete events for now, so no complex read-model/transformation logic) to an SQL Server database (which is also in the cloud). The subscription Checkpoint is maintained in the receiving database (Sql Server). I have begun running bulk tests / load tests to test performance and reliability.

Scenario: I have uploaded 20.000 events to the Event Store, concerning 4.000 Entities; each gets 1 Create, followed by 4 Update events. The events were generated sequentially (synchronously), so they are in the Event Store in order (events 1-5 concern Entity 1, 6-10 are Entity 2, etc.). The containing data is written in such a way that I can easily query at the end whether Entities are out of sync or events were not processed in the correct (i.e. chronological) order.

Expected: After starting the MicroService and waiting for the subscription to catch up (checkpoint is 0-leading) I expect to see the correct outcome after each run: image After a run, i stop the MicroService and delete the Checkpoint and the Entities from the database, in order to re-run the test.

Issue: In some runs (not all, and not predictably), the subscription loses track of its Checkpoint very early on: image The above screenshot was taken after the subscription has caught up (confirmed by logging), but as can be seen its Checkpoint is very much out of sync. This would mean, were I to restart the MicroService (in a Production scenario this would be for instance Releasing a newer version of the app), that it would reconstruct its entire read-model unnecessarily. This would be undesirable in our business case. The issue occurs intermittently. I always see it breaking almost immediately (in the first 20 or so processed events) or not at all. From glancing at the Eventuous code I learned that the SaveCheckpoint method only updates if it encounters its current checkpoint-1 exactly (e.g. only update to 14 if current position is 13). Thus, if only a single Save is missed the Checkpoint can never again be updated by the same running application (the only fix would be a restart, causing a reconstruction as mentioned above).

Guess: Since I always see it breaking early after application startup or not at all, I am guessing there is an issue with application startup, bootstrapping, or establishing the initial database connection. Somewhere in that process 1 of the SaveCheckpoint actions is lost (oddly it loses for instance action 13 after the first 12 were processed succesfully). So far I have not been able to lose a Checkpoint after an application has been running for a few seconds, even with a heavy processing load.

Addendum: Also, I should add that I have also encountered above scenario while running the MicroService locally and connecting with the 2 databases in the Cloud.

ReinderReinders avatar Dec 14 '22 17:12 ReinderReinders

Can you post the checkpoint query, as it's hard to understand what NR fields mean and the query is not fully visible.

alexeyzimarev avatar Dec 15 '22 08:12 alexeyzimarev

One more thing, could you post your bootstrap code? The subscription configuration part is essential, and where you tell the application to use the particular checkpoint store.

alexeyzimarev avatar Dec 15 '22 08:12 alexeyzimarev

And one more thing:

if only a single Save is missed the Checkpoint can never again be updated by the same running application

That's by design, as the expectation is that all the events are projected in the right order. If the projecting handler fails to execute an update, the read model will be in an unknown state. By default, the subscription is set to ignore these errors, and it should move the checkpoint regardless. Maybe you can point out if you got any logs from your projectors that indicate the failure.

alexeyzimarev avatar Dec 15 '22 08:12 alexeyzimarev

Hi, sorry for the late reaction, I have been ill this past week.

--> Can you post the checkpoint query, as it's hard to understand what NR fields mean and the query is not fully visible.

The Checkpoint part of the query is not that complicated, just a select, but sure here you go:

select 

(select POSITION from eventuous.Checkpoints) as CP,

(select count(*) from receiving_table) as 'NR TOTAL',

(select count(*) from receiving_table
where information_fields like '%OK%'
and information_fields not like '%NOK%') as 'NR OK',

(select count(*) from receiving_table
where information_fields like '%NOK%') as 'NR NOK';

(slightly redacted to hide business secrets)

The idea is that in Create and Update events I fill certain fields (it's a column containing JSON) with a string value containing "NOK". The subsequent update then overwrites the previous property with "OK" and adds a new "NOK" (except for the last update, that one does not add another NOK). This way, if any event is processed out of order, or not processed at all, the read model at the end has a NOK value in it. It's an easy and foolproof way of being able to ensure at the end that the event stream was fully processed, and in order.

--> One more thing, could you post your bootstrap code? The subscription configuration part is essential, and where you tell the application to use the particular checkpoint store.

Certainly. The registration for the Checkpoint store has been hidden behind two extension methods:

public void ConfigureServices(IServiceCollection services)
  {	
		// (other registrations, redacted)

		// DatabaseSettings contains settings for the receiving DB, i.e. the SQL Server database
      var databaseSettings = services.BuildServiceProvider().GetRequiredService<DatabaseSettings>();
		
		// EventSourcingSettings contains settings for the Event Store (i.e. Postgres) and configuration for the Subscriptions
		var eventSourcingSettings = services.BuildServiceProvider().GetRequiredService<EventSourcingSettings>();
		
      services.AddSqlServerCheckpointStore(databaseSettings.ConnectionString);

      if (eventSourcingSettings.InitializeCheckpointSchema)
      {
          services.InitializeSqlServerCheckpointSchema(databaseSettings.ConnectionString,
              eventSourcingSettings.SchemaName);
      }
	
		// here goes the subscription registration, see below (last snippet)
	}

The extension methods mentioned above:

  public static IServiceCollection AddSqlServerCheckpointStore(this IServiceCollection services,
       string connectionString)
   {
       SqlConnection GetConnection() => new(connectionString);
       services.AddSingleton((GetSqlServerConnection)GetConnection);
       services.AddCheckpointStore(cfg => new SqlServerCheckpointStore(
           cfg.GetRequiredService<GetSqlServerConnection>(),
           Constants.Eventuous));

       return services;
   }

(just a bread-and-butter Checkpoint store registration as far as I can see. I see no options or alternative configuration possible there)

public static IServiceCollection InitializeSqlServerCheckpointSchema(this IServiceCollection services,
      string connectionString, string schemaName)
  {
      ILogger? logger = services.BuildServiceProvider().GetService<ILogger>();

      try
      {
          using var connection = new SqlConnection(connectionString);
          connection.Open();
          using var cmd = new SqlCommand($"SELECT 1 FROM sys.schemas WHERE name = '{schemaName}'", connection);
          var response = cmd.ExecuteScalar();

          if (response != null)
          {
              logger?.LogInformation(
                  "InitializeSqlServerCheckpointSchema: skipped creating Checkpoint schema, already exists.");
          }
          else
          {
              SqlConnection GetConn() => new(connectionString);
              var schema = new Schema(new SqlServerStoreOptions($"{schemaName}").Schema);
              schema.CreateSchema(GetConn).Wait();

              logger?.LogInformation(
                  "InitializeSqlServerCheckpointSchema created the Checkpoint schema.");
          }
      }
      catch (Exception ex)
      {
          // try-catch because the NSwag client generation tool fails on build
          logger?.LogError(ex, "InitializeSqlServerCheckpointSchema failed with error:");
      }

(this is an attempt of mine to make the receiving database Idempotent, i.e. create the Schema for Checkpoints if it doesn't yet exist. Should execute immediately and synchronously (.Wait() ) so I can't really see how that would break the subscription)

Subscriptions can be configured at appsettings level and are each registered in turn (in this testing scenario, I am only configuring 1 Subscription anyway):

  foreach (ExternalEntitySubscription subscription in eventSourcingSettings.ExternalEntitySubscriptions)
     {
         services.AddSubscription<PostgresAllStreamSubscription, PostgresAllStreamSubscriptionOptions>(
             subscription.SubscriptionName,
             builder => builder
                // .AddConsumeFilterFirst(new AsyncHandlingFilter(20))
                 .AddConsumeFilterLast(new MessageFilter(x =>
                 {
                     foreach (string filter in subscription.FiltersOnStreamName)
                     {
                         if (!x.Stream.ToString().Contains(filter))
                         {
                             return false;
                         }
                     }

                     return true;
                 }))
                 .AddEventHandler<MyCustomEventHandler>()
                 // Enables parallel processing. The default option uses the message stream name as partition key, see: https://eventuous.dev/docs/subscriptions/pipes/
                 .WithPartitioningByStream(subscription.PartitionsCount));
     }	

(slightly redacted, but the idea is clear. I have an Event Handler that extends the abstract Eventuous.Subscriptions.EventHandler base class. The AsyncHandlingFilter has been outcommented since it completely ruined the logic... I don't think it belonged here).

--> That's by design, as the expectation is that all the events are projected in the right order. If the projecting handler fails to execute an update, the read model will be in an unknown state. By default, the subscription is set to ignore these errors, and it should move the checkpoint regardless. Maybe you can point out if you got any logs from your projectors that indicate the failure.

I understand the design, though I had initially interpreted this somewhat differently. Namely the 'at-least once processing' guarantee mentioned in the documentation (if we can't guarantee that event 14 was processed, we can't write Checkpoint 14; if the application restarts, pick up from the last point we are certain was successful).

But this does not appear to be the case here. I can verify that the entire event stream is processed succesfully and in proper order (the OK/NOK test described above). In other words, the projecting handler definitely does not fail to execute an update. The ONLY thing I see failing is the Checkpoint write (I have been unable to catch it in logging yet, but am still attempting to). This also tells me that the event update and the Checkpoint update are not part of a transaction (but I expected that, the documentation never mentions that it should be). I guess what I am missing is a retry or failover system in case a Checkpoint update fails to execute? Since it is not transactional?

ReinderReinders avatar Dec 21 '22 10:12 ReinderReinders

Update: attempting to capture logging has so far been unsuccessful (due to my cloud application apparently refusing to log when I demand too much of it... such as during a load test) but I have found out that I am exceeding the CPU capacities of my receiving DB server. That would imply that this is a database capacity issue, not an Eventuous issue. Will upgrade and retest.

ReinderReinders avatar Dec 21 '22 15:12 ReinderReinders

Update: I'm not sure if this adds anything to the issue as already reported, but I now have a slightly different deployment with a MicroService in the cloud (upped the Partitioning Count to 50), which loses its checkpoint early on (but later than previously seen) during a subscription. Glancing at my logging, I see certain Checkpoint commits being executed more than once (e.g. 66, 275, 413, 428) until this commit action just stops being called (after 428, no more commits... although by now 20.000 events have been handled):

image

There are no Errors in the logging. I have no idea why the checkpoint commit just suddenly stops working.

Edit: a correction, it is a Partitioning Count of 100, not 50.

Edit2: With a Partitioning of 1 the Checkpoint is not lost. My suspicion already was that the Partitioning has something to do with it, now I can reproduce it consistently.

ReinderReinders avatar Dec 23 '22 13:12 ReinderReinders

Commits don't happen when the commit handler cannot get a gapless sequence of checkpoints. Each event received by a subscription gets its own monotonically increasing sequence number. When events are partitioned, the sequence gets broken inside each partition. When the event is successfully handled or ignored, the event position and sequence are passed to the checkpoint commit handler. It then tries to re-sequence all the positions linearly.

I can try adding some diagnostics to the gap detection, so you can plug in and see why it gets stuck.

alexeyzimarev avatar Dec 29 '22 14:12 alexeyzimarev

Some more diagnostics would be most welcome.

ReinderReinders avatar Jan 02 '23 09:01 ReinderReinders

Here comes some new diags: https://github.com/Eventuous/eventuous/pull/172

alexeyzimarev avatar Jan 05 '23 15:01 alexeyzimarev

@alexeyzimarev I heartily approve and I have approved. When will a new version be released? I would like to use this in my tests soon.

edit: oh, I just realised I could checkout the /dev branch and manually add these projects to my solution to test it.

ReinderReinders avatar Jan 05 '23 15:01 ReinderReinders

All the preview versions are on MyGet. It's described in the readme.

alexeyzimarev avatar Jan 06 '23 10:01 alexeyzimarev

@alexeyzimarev I am not sure what this tells us, but I've run a test run with the new diagnostics. The checkpoint is lost after 10243. If I look in the logging I see no error (failed to store Checkpoint) but I do see a constant repeat of the following:

image

edit: if you "cut out the middle man" (all the other logging) you can pinpoint exactly where it goes wrong:

image

but sadly no explanation as to why.

edit2: the thing I do notice is that for the first time, these last two values do not increment correctly - it falls back somehow?

image

ReinderReinders avatar Jan 12 '23 15:01 ReinderReinders

It seems really weird as it looks the same to me

Last commit position 10243:10243 is behind latest position 10243:10243

Seems like a bug, the same event was processed twice, but it should move on.

alexeyzimarev avatar Jan 12 '23 20:01 alexeyzimarev

I let it commit the duplicate position and it will raise a warning when it happens. Try with the latest preview from MyGet.

alexeyzimarev avatar Jan 12 '23 21:01 alexeyzimarev

I have upgraded to the latest preview version (0.13.1-alpha.0.4) and have run it a few times with very verbose logging.

I no longer see the same ["eventuous"] log lines I saw yesterday (where it logs the gap/last commit position). Instead every time I see only this one line. The logging breaks exactly at ID 101 every time:

image

Despite the logging breaking, I have not yet seen a checkpoint being lost. So it appears to be a little more stable now (despite my losing the extra diagnostics logging that was added last week).

This is a smaller batch test, I am going to retry my large bulktest next with this version. That one will take a while to run.

ReinderReinders avatar Jan 13 '23 09:01 ReinderReinders

It's a bug, I will fix it now.

alexeyzimarev avatar Jan 13 '23 10:01 alexeyzimarev

It's in the latest

alexeyzimarev avatar Jan 13 '23 11:01 alexeyzimarev

Does this help? First time I'm seeing this error.

2023-01-13 13:47:16 [ERR] ["Eventuous.Subscription"] [] [dispatcher_base_dev] Unable to commit position "CommitPosition { Position = 932983, Sequence = 32982, Timestamp = 01/06/2023 03:53:29, Valid = True, LogContext = Eventuous.Subscriptions.Logging.LogContext }"
System.NullReferenceException: Object reference not set to an instance of an object.
   at System.Collections.Generic.SortedSet`1.DoRemove(T item)
   at System.Collections.Generic.SortedSet`1.RemoveWhere(Predicate`1 match)
   at Eventuous.Subscriptions.Checkpoints.CheckpointCommitHandler.CommitInternal(CommitPosition position, CancellationToken cancellationToken)

Strangely enough, the checkpoint that failed to save to the database is the last value that WAS saved:

image

So maybe a timeout/error on the return from DB to application code?

ReinderReinders avatar Jan 13 '23 12:01 ReinderReinders

I don't think it can be fixed without debugging, as the code is very simple, and I can't see where the null reference can happen:

            _positions.RemoveWhere(x => x.Sequence <= position.Sequence);

The content of _positions is CommitPosition record struct, which can't be null.

alexeyzimarev avatar Jan 13 '23 13:01 alexeyzimarev

Ok, I made sure that there are no duplicate positions added to the list of pending positions. I would expect the change to fix the issue.

alexeyzimarev avatar Jan 14 '23 11:01 alexeyzimarev

@alexeyzimarev for clarity, which of these commits fixes the issue? image (i.e. has this already been pushed to the latest stable?)

ReinderReinders avatar Jan 16 '23 09:01 ReinderReinders

I have retested with both the latest stable (0.13.1) and the latest alpha (0.8). Sadly I am still able to sometimes lose the Checkpoint.

ReinderReinders avatar Jan 18 '23 18:01 ReinderReinders

0.13.1 contains everything. I haven't done any change after that other than in branches.

I didn't claim that the issue is fixed, as I am unable to reproduce it. The following changes are included:

  • Resolve the "gap" issue when the new and previous commits are the same (your first discovery with additional diags)
  • Attempt to resolve the SortedSet null reference exception by changing the code to add new positions to the set using Union.

alexeyzimarev avatar Jan 19 '23 09:01 alexeyzimarev

@alexeyzimarev I have managed to find the error (though not the solution):

ss1

  • I have included the relevant Eventuous .csprojs in my solution instead of using nuget packages, so I can debug
  • Ran a large bulk subscription (1 million events to process all at once) with a large partition count (100)
  • Let Visual Studio break on any exception

I already suspected that it was the CheckpointCommitHandler (or one of its components) that crashed, since I keep observing the same issue: the application logic (including processing events) continues to work (flawlessly), but the checkpoint is no longer updated in the Event Store (I'm storing checkpoints in the Event Store now instead of at the receiving end). It appears to be a race condition: the CommitPositionSequence contains so many elements that it takes a 'while' to enumerate the collection. During this interval, more checkpoints are added to the sequence, which causes above exception. The CommitHandler apparently runs on a separate thread (I guess?) since, after F5-ing after above Exception, the application continues to run. However checkpoints are no longer written to the store.

ReinderReinders avatar Feb 01 '23 10:02 ReinderReinders

Also, perhaps I should add that this exception occurs in the Core libraries of Eventuous (not the Postgres implementation of..). So this issue could occur with any implementation.

ReinderReinders avatar Feb 01 '23 10:02 ReinderReinders

As for a suggested solution, hmm... make a copy of the collection and run the enumeration on that? I'm not sure if that's a waterproof solution.

ReinderReinders avatar Feb 01 '23 10:02 ReinderReinders

Do you still get null reference exception?

alexeyzimarev avatar Feb 01 '23 11:02 alexeyzimarev

No edit: not in this test run. This was the first exception that occurred.

ReinderReinders avatar Feb 01 '23 11:02 ReinderReinders

@alexeyzimarev Edit: On the next test run, I now get a null reference exception: image

image

ReinderReinders avatar Feb 01 '23 11:02 ReinderReinders

Would be nice if you can check what's inside both collections.

alexeyzimarev avatar Feb 01 '23 12:02 alexeyzimarev