EventFlow icon indicating copy to clipboard operation
EventFlow copied to clipboard

Race conditions in MssqlReadModelStore

Open alasdair-stark-screenmedia opened this issue 3 years ago • 1 comments

When a read model is to be created or updated, the UpdateReadModelAsync method is called. This method performs the following steps:

  1. Fetch read model from database if it exists
  2. Create / update readmodel in memory
  3. Create / update readmodel in database

One of our read models is a view of a subset of the data within an aggregate. We use a read model locator to determine the id. The id is a combination of the aggregate id and other static data within the aggregate.

When load testing our API we noticed that SQL queries were failing as they were attempting to insert duplicate primary keys. Upon further investigation, we discovered that this occurred when parallel requests resulted in events being published against a single aggregate. The read model id generated by the locator for these two events were the same. From our perspective this is the desired behaviour of the read model locator.

When UpdateReadModelAsync was called by the separate threads, no existing read model was found in the database by either one. Both attempted to create the read model and insert it into the database, resulting in the primary key violation. I can easily see similar issues taking place when multiple threads are attempting to update the same existing read model - one will overwrite the other.

There is code in the read model store that attempts to detect concurrency issues and throws an OptimisticConcurrencyException exception but we never saw this exception thrown during our load tests. Looking at the conditional check that takes place on line 186, I can't see how this would ever be able to detect the issues we experienced.

I suspect there may be simpler ways to demonstrate race conditions in this code so apologies if my example is a overcomplicated. I also appreciate that this may be a bigger-picture issue that we shouldn't expect Eventflow itself to be responsible for addressing. I would be interested to see some discussion on the topic.

In our own application we ended up replacing MssqlReadModelStore with a custom read model store. We wrap the two queries in UpdateReadModelAsync in a transaction and the query to fetch the read model from the database uses the XLOCK, ROWLOCK, HOLDLOCK hints to lock the read model row for the duration of the transaction. This has solved the issues we experienced during load testing with minimal impact on performance but I don't think it's the most elegant solution to the problem - especially with the use of the locking hints!

Here's our custom class based on the original. It would be good to know if we've done anything here that might break anything in Eventflow! I might have broken the TransientFaultHandler or something...

// 
// Copyright (c) 2015-2020 Rasmus Mikkelsen
// Copyright (c) 2015-2020 eBay Software Foundation
// https://github.com/eventflow/EventFlow
// 
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
// 
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
// 
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using Dapper;
using EventFlow.Aggregates;
using EventFlow.Core;
using EventFlow.Core.RetryStrategies;
using EventFlow.Extensions;
using EventFlow.Logs;
using EventFlow.MsSql;
using EventFlow.MsSql.ReadStores;
using EventFlow.MsSql.ReadStores.Attributes;
using EventFlow.ReadStores;
using EventFlow.Sql.ReadModels;
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;

#pragma warning disable 618

namespace EventSource.ReadModel.MsSql
{
    public class TransactionalMssqlReadModelStore<TReadModel> :
        ReadModelStore<TReadModel>,
        IMssqlReadModelStore<TReadModel>
        where TReadModel : class, IReadModel
    {
        private readonly IMsSqlConnection _connection;
        private readonly IReadModelSqlGenerator _readModelSqlGenerator;
        private readonly IReadModelFactory<TReadModel> _readModelFactory;
        private readonly ITransientFaultHandler<IOptimisticConcurrencyRetryStrategy> _transientFaultHandler;
        private static readonly Func<TReadModel, int?> GetVersion;
        private static readonly Action<TReadModel, int?> SetVersion;
        private static readonly string ReadModelNameLowerCase = typeof(TReadModel).Name.ToLowerInvariant();

        // Lock type XLOCK specifies that the lock be taken and held until the transaction completes
        // Granularity of ROWLOCK minimises performance costs by only blocking reads of a single read model row at a time
        // HOLDLOCK is equivalent to SERIALIZABLE
        private const string LockingHints = "XLOCK, ROWLOCK, HOLDLOCK";

        static TransactionalMssqlReadModelStore()
        {
            var propertyInfos = typeof(TReadModel)
                .GetProperties(BindingFlags.Instance | BindingFlags.Public);

            var versionPropertyInfo = propertyInfos
                .SingleOrDefault(p => p.GetCustomAttribute<MsSqlReadModelVersionColumnAttribute>() != null);
            if (versionPropertyInfo == null)
            {
                versionPropertyInfo = propertyInfos.SingleOrDefault(p => p.Name == "LastAggregateSequenceNumber");
            }

            if (versionPropertyInfo == null)
            {
                GetVersion = rm => null as int?;
                SetVersion = (rm, v) => { };
            }
            else
            {
                GetVersion = rm => (int?)versionPropertyInfo.GetValue(rm);
                SetVersion = (rm, v) => versionPropertyInfo.SetValue(rm, v);
            }
        }

        public TransactionalMssqlReadModelStore(
            ILog log,
            IMsSqlConnection connection,
            IReadModelSqlGenerator readModelSqlGenerator,
            IReadModelFactory<TReadModel> readModelFactory,
            ITransientFaultHandler<IOptimisticConcurrencyRetryStrategy> transientFaultHandler)
            : base(log)
        {
            _connection = connection;
            _readModelSqlGenerator = readModelSqlGenerator;
            _readModelFactory = readModelFactory;
            _transientFaultHandler = transientFaultHandler;
        }

        public override async Task UpdateAsync(IReadOnlyCollection<ReadModelUpdate> readModelUpdates,
            IReadModelContextFactory readModelContextFactory,
            Func<IReadModelContext, IReadOnlyCollection<IDomainEvent>, ReadModelEnvelope<TReadModel>, CancellationToken,
                Task<ReadModelUpdateResult<TReadModel>>> updateReadModel,
            CancellationToken cancellationToken)
        {
            foreach (var readModelUpdate in readModelUpdates)
            {
                await _transientFaultHandler.TryAsync(
                    c => UpdateReadModelAsync(readModelContextFactory, updateReadModel, c, readModelUpdate),
                    Label.Named($"mssql-read-model-update-{ReadModelNameLowerCase}"),
                    cancellationToken)
                    .ConfigureAwait(false);
            }
        }

        private async Task UpdateReadModelAsync(
            IReadModelContextFactory readModelContextFactory,
            Func<IReadModelContext, IReadOnlyCollection<IDomainEvent>, ReadModelEnvelope<TReadModel>, CancellationToken, Task<ReadModelUpdateResult<TReadModel>>> updateReadModel,
            CancellationToken cancellationToken,
            ReadModelUpdate readModelUpdate)
        {
            var label = Label.Named("mssql-store-read-model", ReadModelNameLowerCase);

            // This implementation is based on the use of transactions in the SqlConnection class
            // https://github.com/eventflow/EventFlow/blob/develop-v0/Source/EventFlow.Sql/Connections/SqlConnection.cs
            await _connection.WithConnectionAsync<int?>(
                label,
                async (c, ct) =>
                {
                    // Use the default isolation level
                    using (var t = c.BeginTransaction())
                    {
                        try
                        {
                            IMssqlReadModel mssqlReadModel;
                            var readModelId = readModelUpdate.ReadModelId;

                            var readModelEnvelope = await GetAsync(readModelId, ct, c, t, LockingHints).ConfigureAwait(false);
                            var readModel = readModelEnvelope.ReadModel;
                            bool isNew = readModel == null;

                            if (isNew)
                            {
                                readModel = await _readModelFactory.CreateAsync(readModelId, ct).ConfigureAwait(false);
                                mssqlReadModel = readModel as IMssqlReadModel;
                                if (mssqlReadModel != null)
                                {
                                    mssqlReadModel.AggregateId = readModelId;
                                    mssqlReadModel.CreateTime = readModelUpdate.DomainEvents.First().Timestamp;
                                }

                                readModelEnvelope = ReadModelEnvelope<TReadModel>.With(readModelUpdate.ReadModelId, readModel);
                            }

                            var readModelContext = readModelContextFactory.Create(readModelId, isNew);

                            var originalVersion = readModelEnvelope.Version;

                            var readModelUpdateResult = await updateReadModel(
                                    readModelContext,
                                    readModelUpdate.DomainEvents,
                                    readModelEnvelope,
                                    cancellationToken)
                                .ConfigureAwait(false);

                            if (!readModelUpdateResult.IsModified)
                            {
                                // Read model has not been modified so there is nothing more to do
                                t.Commit();
                                return null;
                            }

                            readModelEnvelope = readModelUpdateResult.Envelope;

                            if (readModelContext.IsMarkedForDeletion)
                            {
                                await DeleteAsync(readModelId, ct, c, t).ConfigureAwait(false);

                                // Read model has been deleted so there is nothing more to do
                                t.Commit();
                                return null;
                            }

                            mssqlReadModel = readModel as IMssqlReadModel;
                            if (mssqlReadModel != null)
                            {
                                mssqlReadModel.UpdatedTime = DateTimeOffset.Now;
                                mssqlReadModel.LastAggregateSequenceNumber = (int)readModelEnvelope.Version.GetValueOrDefault();
                            }
                            else
                            {
                                SetVersion(readModel, (int?)readModelEnvelope.Version);
                            }

                            await UpsertAsync(readModel, isNew, originalVersion, ct, c, t).ConfigureAwait(false);

                            t.Commit();

                            Log.Verbose(() => $"Updated MSSQL read model {typeof(TReadModel).PrettyPrint()} with ID '{readModelId}' to version '{readModelEnvelope.Version}'");

                            return null;
                        }
                        catch (Exception e)
                        {
                            t.Rollback();
                            Log.Debug(
                                e,
                                "Exceptions was thrown while update read model within a transaction in '{0}'",
                                label);
                            throw;
                        }
                    }
                },
                cancellationToken);
        }

        private async Task<ReadModelEnvelope<TReadModel>> GetAsync(
            string id,
            CancellationToken cancellationToken,
            IDbConnection dbConnection,
            IDbTransaction transaction,
            string lockingHints)
        {
            var readModelType = typeof(TReadModel);
            var sql = _readModelSqlGenerator.CreateSelectSql<TReadModel>();

            // If locking hints are specified then inject them into the SQL
            if (!string.IsNullOrWhiteSpace(lockingHints))
            {
                sql = sql.Insert(sql.IndexOf("WHERE"), $"WITH ({lockingHints}) ");
            }

            var commandDefinition = new CommandDefinition(sql, new { EventFlowReadModelId = id }, transaction, cancellationToken: cancellationToken);

            var readModels = await dbConnection.QueryAsync<TReadModel>(commandDefinition).ConfigureAwait(false);

            var readModel = readModels.SingleOrDefault();

            if (readModel == null)
            {
                Log.Verbose(() => $"Could not find any MSSQL read model '{readModelType.PrettyPrint()}' with ID '{id}'");
                return ReadModelEnvelope<TReadModel>.Empty(id);
            }

            var readModelVersion = GetVersion(readModel);

            Log.Verbose(() => $"Found MSSQL read model '{readModelType.PrettyPrint()}' with ID '{id}' and version '{readModelVersion}'");

            return ReadModelEnvelope<TReadModel>.With(id, readModel, readModelVersion);
        }

        private async Task DeleteAsync(
            string id,
            CancellationToken cancellationToken,
            IDbConnection dbConnection,
            IDbTransaction transaction)
        {
            var sql = _readModelSqlGenerator.CreateDeleteSql<TReadModel>();

            var commandDefinition = new CommandDefinition(sql, new { EventFlowReadModelId = id }, transaction, cancellationToken: cancellationToken);
            var rowsAffected = await dbConnection.ExecuteAsync(commandDefinition).ConfigureAwait(false);

            if (rowsAffected != 0)
            {
                Log.Verbose($"Deleted read model '{id}' of type '{ReadModelNameLowerCase}'");
            }
        }

        private async Task UpsertAsync(
            TReadModel readModel,
            bool isNew,
            long? originalVersion,
            CancellationToken cancellationToken,
            IDbConnection dbConnection,
            IDbTransaction transaction)
        {
            var sql = isNew
                ? _readModelSqlGenerator.CreateInsertSql<TReadModel>()
                : _readModelSqlGenerator.CreateUpdateSql<TReadModel>();

            var dynamicParameters = new DynamicParameters(readModel);
            if (originalVersion.HasValue)
            {
                dynamicParameters.Add("_PREVIOUS_VERSION", (int)originalVersion.Value);
            }

            var commandDefinition = new CommandDefinition(sql, dynamicParameters, transaction, cancellationToken: cancellationToken);

            await dbConnection.ExecuteAsync(commandDefinition).ConfigureAwait(false);
        }


        public override async Task<ReadModelEnvelope<TReadModel>> GetAsync(string id, CancellationToken cancellationToken)
        {
            var readModelType = typeof(TReadModel);
            var selectSql = _readModelSqlGenerator.CreateSelectSql<TReadModel>();
            var readModels = await _connection.QueryAsync<TReadModel>(
                Label.Named("mssql-fetch-read-model", ReadModelNameLowerCase),
                cancellationToken,
                selectSql,
                new { EventFlowReadModelId = id })
                .ConfigureAwait(false);

            var readModel = readModels.SingleOrDefault();

            if (readModel == null)
            {
                Log.Verbose(() => $"Could not find any MSSQL read model '{readModelType.PrettyPrint()}' with ID '{id}'");
                return ReadModelEnvelope<TReadModel>.Empty(id);
            }

            var readModelVersion = GetVersion(readModel);

            Log.Verbose(() => $"Found MSSQL read model '{readModelType.PrettyPrint()}' with ID '{id}' and version '{readModelVersion}'");

            return ReadModelEnvelope<TReadModel>.With(id, readModel, readModelVersion);
        }

        public override async Task DeleteAsync(
            string id,
            CancellationToken cancellationToken)
        {
            var sql = _readModelSqlGenerator.CreateDeleteSql<TReadModel>();

            var rowsAffected = await _connection.ExecuteAsync(
                Label.Named("mssql-delete-read-model", ReadModelNameLowerCase),
                cancellationToken,
                sql,
                new { EventFlowReadModelId = id })
                .ConfigureAwait(false);

            if (rowsAffected != 0)
            {
                Log.Verbose($"Deleted read model '{id}' of type '{ReadModelNameLowerCase}'");
            }
        }

        public override async Task DeleteAllAsync(CancellationToken cancellationToken)
        {
            var sql = _readModelSqlGenerator.CreatePurgeSql<TReadModel>();
            var readModelName = typeof(TReadModel).Name;

            var rowsAffected = await _connection.ExecuteAsync(
                Label.Named("mssql-purge-read-model", readModelName),
                cancellationToken,
                sql)
                .ConfigureAwait(false);

            Log.Verbose(
                "Purge {0} read models of type '{1}'",
                rowsAffected,
                readModelName);
        }
    }
}

Hello there!

We hope you are doing well. We noticed that this issue has not seen any activity in the past 90 days. We consider this issue to be stale and will be closing it within the next seven days.

If you still require assistance with this issue, please feel free to reopen it or create a new issue.

Thank you for your understanding and cooperation.

Best regards, EventFlow

github-actions[bot] avatar Apr 08 '23 13:04 github-actions[bot]

Hello there!

This issue has been closed due to inactivity for seven days. If you believe this issue still needs attention, please feel free to open a new issue or comment on this one to request its reopening.

Thank you for your contribution to this repository.

Best regards, EventFlow

github-actions[bot] avatar Apr 16 '23 09:04 github-actions[bot]