EventFlow icon indicating copy to clipboard operation
EventFlow copied to clipboard

Crash resilience

Open rasmus opened this issue 6 years ago • 44 comments

If the application using EventFlow shuts down unexpectedly, e.g. due to a power failure, EventFlow will lose consistency between what is committed to the event store and what is applied to read models and subscribers as any in-memory updates will be lost.

While any commands published (passed to ICommandBus.PublishAsync) but hasn’t yet resulted in any committed events to the event store are also lost, they are considered out-of-scope for this issue, as EventFlow doesn’t lose consistency.

Possible solutions

  • Transaction log: Any events committed to the event store could be written to a log and removed when read models and subscribers have been successfully updated
  • others... ?

Mitigation

Steps can be made to minimize the impact that an unexpected shutdown has

  • Always drain servers for activity, e.g. web requests, before doing a application shutdown or upgrade
  • Use asynchronous subscribers, i.e., let Hangfire (or what ever scheduling you have configured for EventFlow) handle subscriber updates

rasmus avatar Mar 06 '18 04:03 rasmus

My team is also looking into situation's like this and I believe it would be pretty hard to ensure for every possible crash situation. Would using asynchronous subscribers not loose the order in which the subscribers are called? Right now we are sure that the ReadModel Subscribers have been called before any others.

Transaction log you mean that each subscriber has to ACK that it has processed the event? And the DomainEventPublisher somehow retries after recovery for events that have not been processed by all registered Subscribers?

Has this never been an issue in production @rasmus ?

wgtmpeters avatar Mar 06 '18 20:03 wgtmpeters

@wgtmpeters We haven't detected any abnormalities in our production setup that could be traced back to this. However, our production setup is rather stable and not prone to unexpected host crashes or application termination and we carefully drain all servers of web requests before upgrading.

I do think the issue is important, as it will allow me to sleep better at night 😄 (and it makes sense)

For the transaction log, I thought merely to write one entry per committed batch of events and when the DomainEventPublisher finishes successfully, then remove the entry. In case of a crash, the events could be picked up and retied. Events would be pushed to read models and subscribers more than once in case of a crash, which would need to be handled.

Even-though our servers in production is stable, our network infrastructure isn't and in addition we use RabbitMQ quite a lot. This means that our applications are (usually) written to handle a "at least once event", which is what I thought could be done for read models and subscribers.

rasmus avatar Mar 06 '18 21:03 rasmus

Persistence of the event and the transaction log would then to have be in the same logical commit right?

And you would need to extend the existing eventstore implementations to also be able to handle this. Could be as simple as a flag indicator per event if It has been published.

But how to handle this the eventstore eventstore package? As a side Q: Do you consider the Eventstore. Eventstore package production ready because er are looking at using it.

wgtmpeters avatar Mar 06 '18 22:03 wgtmpeters

Are there still plans to do something with this issue? We are going to create something for this as well. When we have a concept ready I wil update this issue. We will try and come up with a solution that fits into eventflow core

wgtmpeters avatar May 08 '18 12:05 wgtmpeters

@wgtmpeters Currently no plans, any suggestions and/or PRs would be greatly appreciated

rasmus avatar May 08 '18 19:05 rasmus

@wgtmpeters @rasmus Any progress on a fix for this? I'm currently trying to pick a CQRS+ES framework, and while this one is pretty feature rich and actively developed, this particular issue is a scary one for me.

If using MSSQL (which I am), would wrapping each command in a transaction scope ensure the read/write models stay consistent? Not sure of the viability of this given the code structure, but the possibility is mentioned here in the docs (with a recommendation not to do it!) https://eventflow.readthedocs.io/FAQ.html

bedej avatar Aug 02 '18 22:08 bedej

we are working on it internally but have it is not ready yet. At the moment we are working on being able to recover from crashing anywhere within the process of the DomainEventPublisher. So each handle (readmodel update, subscriber update, saga update) will be a small transaction which is flagged as handled somewhere for each DomainEvent. That way when processing stops somewhere some other instance of your service can pick up where it stopped processing. For now we see no use for wrapping the commands because those already have a retry mechanisme depending on how you send them.

For us it is either based on a event from kafka, which will retry when the command fails because the commit on the kafka message is not done. Or the command is coming from some subscriber (which is already in a transaction as mentioned before) , so that will also be retried eventually.

Some things to consider in this way of working is that you should be able to handle a single command being sent multiple times.

wgtmpeters avatar Aug 07 '18 20:08 wgtmpeters

@wgtmpeters will it be possible to map that functionality to the core package or is it a new DomainEventPublisher that's installed?

rasmus avatar Aug 08 '18 05:08 rasmus

@rasmus so far it seems more likely that we have to replace the existing one. It also is not only in the DomainEventPublisher but also the IDispatchToSagas and IDispatchToSubscribers implementation.

We will first create a fully working POC and then we will see if we can clean it up.

wgtmpeters avatar Aug 08 '18 07:08 wgtmpeters

Catch up subscriptions solution is quite elegant https://eventstore.org/blog/20130306/getting-started-part-3-subscriptions/

sitepodmatt avatar Aug 24 '18 11:08 sitepodmatt

@sitepodmatt we still need some mechanism for stamping the domain events that have been successfully handled by subscribers and as the handling is asynchronous, the completion might not be trivial. E.g. event 1, 3, might have been successfully been handled, but event 2 took longer and a crash happen in the mean time.

rasmus avatar Aug 25 '18 10:08 rasmus

@wgtmpeters @rasmus Hello! Do you have any updates on fix for this issue? Thanks!

kokhans avatar Oct 31 '18 23:10 kokhans

Nothing yet, no solutions that integration nicely into the framework has been proposed yet

rasmus avatar Nov 01 '18 15:11 rasmus

Sadly we still have not continued work on it. But if we do not get to in soon I will make time for it

wgtmpeters avatar Nov 01 '18 18:11 wgtmpeters

@kokhans, colleague of @wgtmpeters here: we have started working on this again for our project. Currently we're focusing on getting the solution to work for our needs first, which means making some shortcuts as to required infrastructure. After this we will start working on getting the pull request ready for @rasmus to take a crack at. Our aim still is for this code to end up in the Eventflow repository so it can be maintained along with the rest.

ajeckmans avatar Nov 05 '18 12:11 ajeckmans

What approach are you taking?

The approach I've seen other event sourcing frameworks use is that when events are committed to the store table, each event also creates an entry in another table for "pending event delivery". This is all done within a single transaction. Then there's a worker that will deliver the events and remove them once they've been dispatched (this could be after they're confirmed queued with Hangfire or immediate job runner has dispatched them etc, though it does beg the question of how to define that the event has been dispatched successfully, if there are multiple simultaneous handlers).

I think this would require cooperation between the implementation of the IAggregateStore and the IDomainEventPublisher at least, and may thus require additional interfaces on one/both?

bedej avatar Nov 07 '18 03:11 bedej

The scenario you describe Bede seems to most practical for EventFlow keeping the dependency on one database, although Ive moved onto Akka persistence. The approach of commiting to pending event delivery table in same transaction then gives you at least-once delivery semantics presuming the "worker that will deliver the results" does the delivery/publishing then removes the events.

On Wed, 7 Nov 2018 at 10:37, Bede Jordan [email protected] wrote:

What approach are you taking?

The approach I've seen other event sourcing frameworks use is that when events are committed to the store table, each event also creates an entry in another table for "pending event delivery". This is all done within a single transaction. Then there's a worker that will deliver the events and remove them once they've been dispatched (this could be after they're confirmed queued with Hangfire or immediate job runner has dispatched them etc, though it does beg the question of how to define that the event has been dispatched successfully, if there are multiple simultaneous handlers).

I think this would require cooperation between the implementation of the IAggregateStore and the IDomainEventPublisher at least, and may thus require additional interfaces on one/both?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/eventflow/EventFlow/issues/439#issuecomment-436493638, or mute the thread https://github.com/notifications/unsubscribe-auth/ABm-aXUNVucz6XMEN2Z3vjZfieqeVKq1ks5uslVkgaJpZM4SeFsP .

sitepodmatt avatar Nov 07 '18 06:11 sitepodmatt

I would like to point out an importance of this issue and come up with a solution.

I see this issue as a dealbreaker. I would go as far as to suggest not to use EventFlow in production until this data consistency issue is addressed. I find a car airbag as a good metaphore for this. Sure you don't want your car to crash but if you crash you want to have an airbag to save your life. Same here, you don't want your application to crash but if it crashes you want your data to be consistent. If your hands are full of fixing the bug which caused the application to crash in the first place dealing with weird data inconsistencies is the last thing you want to do...

Anyway as I see it as pretty straight forward:

  1. add method ConfirmEvent() into event store
  2. add method ListUnconfirmedEvents() into event store
  3. after saving events and after all event handlers are successfully finished call ConfirmEvent() method on event store
  4. in application start up procedure list unconfirmed events from event store and process all event handlers and subsequently confirm events

Consequences:

  1. event handlers will must change expectation from at most once delivery to at least once delivery (which actually means that they must be able to gracefully handle the event even if it is delivered multiple times)

Possible implementations of EventStore:

  1. add indexed bool column Published into events table. create events with Published = false, ConfirmEvent() method sets Pulished = true, ListUnconfirmedEvents() filters by Published = false
  2. Outbox pattern - during saving events in the same transaction also save copy of events (or just their ids) into Outbox table. in ConfirmEvent() method delete event from Outbox table
  3. event sourced write-only version (e.g. for blockchain or other non-modifiable storage): save events as usual, in method ConfirmEvent(), log event id that the event has been confirmed into separate table just for that purpose. ListUnconfirmedEvents() returns all events except those in confirmed log

mbican avatar Mar 13 '19 16:03 mbican

another thing to consider when designing eventStore interface is #610 dead letter queue. Ideally we should store event subscriber failure information atomically with the event published confirmation

mbican avatar Mar 16 '19 10:03 mbican

@rasmus I've looked into this as well and would like to contribute my findings.

First of all, I would like to thank you for this awesome library. It's also the most starred event sourcing library for C# and I hope I can help with making it even better.

As pointed out in this issue, the 'weakest' (hope I don't offend anyone) part of the library is that of how read models and subscribers are updated.

There are various implementation thinkable that make read models updates crash resilient, but the implementation I have seen most used is that of a cursor-based subscription.

Much like how Kafka operates, you take a stream and begin reading from offset 0. You update the read model and transactionally store the state of the read model alongside with the offset.

There are two interesting streams to update read models from:

  1. the $all stream
  2. group streams

Most event source databases provide an $all stream to read from. This stream contains all events occurred, and can be used to update read models, albeit the projectors (projecting the read models) will skip most of the events. This can be improved by introducing group streams.

Group streams are streams that contain all the events from a specific aggregate type. The event store StreamsDb has laid out such a concept here: https://streamsdb.io/docs/core-concepts#groups. It can also be implemented using EventStore by creating a projection with the same - logic.

I've implemented a working ReadStoreManager here: https://github.com/promontis/EventFlow/blob/master/Source/EventFlow.EventStores.StreamsDb/ReadStores/SubscriptionBasedReadStoreManager.cs

Note line 110 where I skip the in-process update from EventFlow, as the only source for updating the read model should be the stream.

Now this implementation resides within the StreamsDb project, but I would like to transfer it to a more generic place.

If you are open to updating read models this way, I would recommend the following API changes:

  • do not update read models in-process. Eliminate the call to UpdateReadStoresAsync
  • replace the current ReadStoreManager classes with a subscription based one (I've already implemented those, but are not yet in the PR branch).

If possible I would also like to address the various use cases when updating read models. EventFlow implements two:

  • update read model with id from single aggregate
  • update read model with id from multi aggregates

In my current code base, I have added a third:

  • update read model without id from multi aggregates

Since EventFlow doesn't allow read models without an id, I'm currently using the id null (singleton was also discussed). So for example I have a read model named projects-null.

While it doesn't clash within the current architecture of EventFlow, I think it would be nice if we could first class support for it, allowing read models to be named eg. projects.

Finally, I'm also in the progress of implementing subscribers this way. They also have the same problem of being updated in-process from EventFlow. I've currently added a different interface, but the code is really similar to that of ISubscribeSynchronousTo and DispatchToEventSubscribers.

The use case I see here is subscribing to events from other bounded contexts; I'm also subscribing to the group stream of other bounded contexts (could also be the $all stream), and push the events to a handler similar to ISubscribeSynchronousTo. In that handler I'm reacting to that event and storing a new event using an EventBus.

 public class EventBus : IEventBus
    {
        private readonly IAggregateFactory _aggregateFactory;

        public EventBus(IAggregateFactory aggregateFactory)
        {
            _aggregateFactory = aggregateFactory;
        }

        public async Task ExecuteEventAsync<TAggregate, TIdentity>(
            IAggregateEvent<TAggregate, TIdentity> aggregateEvent,
            TIdentity id,
            CancellationToken cancellationToken)
            where TAggregate : AggregateRoot<TAggregate, TIdentity>
            where TIdentity : IIdentity
        {
            var aggregate = await _aggregateFactory.CreateNewAggregateAsync<TAggregate, TIdentity>(id);
            aggregate.Emit(aggregateEvent);
        }
    }

Note the last line... Emit is normally protected, but I've changed it to public: is this ok to do and are you ok with me creating a PR for it?

promontis avatar Nov 04 '19 11:11 promontis

I finally came around to answering this, sorry for the delay.

@promontis You do not offend, this is indeed a weak point with the current EventFlow implementation, one I would like to have implemented before putting a "1.0" stamp on the project.

Although EventFlow is yet to ship a 1.0 release, I'm very reluctant to make large API changes without good cause. But I do see this a one such. That being said, I would very much like to see an example of how this might be done, even a crude one.

rasmus avatar Dec 29 '19 23:12 rasmus

I have been thinking. We are dealing with the Two Generals Problem ( https://en.wikipedia.org/wiki/Two_Generals'_Problem https://www.youtube.com/watch?v=IP-rGJKSZ3s ) which has been proven unsolvable. The way around it is outbox pattern https://medium.com/engineering-varo/event-driven-architecture-and-the-outbox-pattern-569e6fba7216 . The price that we need to pay for it is to attemp to redeliver events ad infinitum.

The first step is to extend IEventPersistance interface to be able to:

  1. list undelivered events
  2. mark event as delivered see suggested extension of interface: https://github.com/mbican/EventFlow/commit/97204b9f30f62a05cccd4d1c4bc03265f316150d

All IEventPersistance implementation needs to implement it natively because it needs to be atomic together with event persistance.

It is not only about read models. It is about all receivers of event from DomainEventPublisher (read stores, subscribers, sagas). also see #609 for DomainEventPublisher modifications

New event delivery implementation in a somewhat backwards compatible way:

# unchanged, commit events
IEventPersistance.CommitEventsAsync(id, ...)

# load undelivered events for aggregate because also some previous events can be still undelivered
var events = IReliableEventPersistance.LoadUndeliveredEventsAsync(id, ...)

# unchanged, publish events
IDomainEventPublisher.PublishAsync(events)

# mark events as delivered (published)
IReliableEventPersistance.MarkEventsDeliveredAsync(id, events)

NOTE: We can stop after IEventPersistance.CommitEventsAsync() and let events to be delivered asynchronously by the background worker

and/or delivery of events in a background worker in infinte loop:

while(true){
    var events = IReliableEventPersistance.LoadAllUndeliveredEvents()
    IDomainEventPublisher.PublishAsync(events)
    IReliableEventPersistance.MarkEventsDeliveredAsync(events)
}

So I think the most work will be with all the various EventPersistance implementations the change in the EventFlow core should be relatively simple. I can try to provide a prototype of the core change

And yes, it can happen that event will be delivered multiple times (unsolvable Two Generals' Problem). Receiver just has to deal with that e.g. by remembering ids of processed events. I would even go as far as recommending to call IDomainEventPublisher.PublishAsync(events) always twice in Debug build so all code is well tested for duplicate event delivery

mbican avatar Jan 09 '20 21:01 mbican

@mbican you are correct! We need to either implement the outbox pattern or update read models and event persistence in the same transaction. Both are currently not really supported.

The event store I'm using has transactional support for multiple streams, thus persistence of events and read models (I have a read model stream) is written as one transaction.

If the event store (like that of Greg Young) doesn't support it, you'll probably need to use the outbox pattern. The most used implementation is that of https://github.com/dotnetcore/CAP.

It would be wonderful if EventFlow could support both.

promontis avatar Jan 09 '20 22:01 promontis

Here is some code so we can iterate the solution. @rasmus is this the general direction you want to see this feature going to?

mbican avatar Jan 13 '20 08:01 mbican

Different idea let's call it "EventLogAggregateRoot" - solution on a higher level of abstraction. Instead of changing low level interface IEventPersistence, let's create help EventLogAggregateRoot aggregate with events:

  • EventToBeCommitedEvent { AggregateId, SequenceNumber }
  • EventConfirmedEvent { AggregateId, SequenceNumber }

The event delivery would newly look like:

# first log that event is about to be commited
IEventPersistance.CommitEventsAsync(logId, new []{ new EventToBeCommitedEvent (id, sequenceNumber) }

# unchanged, commit events
IEventPersistance.CommitEventsAsync(id, ...)

# load undelivered events for aggregate because also some previous events can be still undelivered
var events = // TODO: somehow load events that has not been confirmed yet

# unchanged, publish events
IDomainEventPublisher.PublishAsync(events)

# confirm event has been published
IEventPersistance.CommitEventsAsync(logId, new []{ new EventConfirmedEvent (id, sequenceNumber) }

Recovery process/background worker would use EventLogAggregateRoot to get list of unconfirmed events (aggregate-id + sequenceNumber) and it would publish them and confirm them.

The nice thing about this solution is that it doesn't require modification of EventPersistance adapters. The disadvantage is that EventLog would become a system bottleneck. We would need to rotate several streams with different EventLogId to avoid concurrency conflicts and also to avoid the stream having too many events, but that could be solved by a snapshot of course

mbican avatar Jan 13 '20 21:01 mbican

Something to consider.

If using the basic MSSQL event persistence, events are inserted into the table using a single INSERT statement and always at the very "end" of the table as the GlobalSequenceNumber is the primary clustered key and a IDENTITY(1,1) (always increasing, might jump though). This is done without any explicit transactions and introducing any would add significant overhead (I guess) as the table will be locked..

rasmus avatar Jan 21 '20 18:01 rasmus

Does anyone know which of these two options are the fastest for a MSSQL database with a very high amount of traffic?

  1. Inserting a row and then updating it later
  2. Inserting a row and then deleting it later

Both given you have some production SAN with SSDs underneath.

rasmus avatar Jan 21 '20 18:01 rasmus

@rasmus Could you please provide us with roadmap on this one? We now are in prototype phase for our new Eventflow application, and we need to make decision. AFAIK, we have options: A. Implement readmodel ourselves in separate services (non-Eventflow). Similar to this https://github.com/eventflow/EventFlow/issues/734#issuecomment-584505269 Good, but costly. B. Use Evenflow readmodels, but with risk of losing data C. Use Eventflow readmodels, but decorate command handling with Unit of Work (bye-bye performance)

Questions:

  1. Have you decided about best approach to fix it in Eventflow?
  2. If yes, what's the plan for this? Will you go with some kind of #728 ? What's next step on this?
  3. Could we help in making this happen, i.e. by contributing developer's time?

P.S. Thanks for EventFlow

leotsarev avatar Apr 15 '20 15:04 leotsarev

Hi @leotsarev I understand your frustration, but there isn't any roadmap for this, mainly because there no-one has come up with a really good solution yet. There has been several suggestions, but they all had some flaw (please correct me if I remember wrong).

rasmus avatar Apr 15 '20 17:04 rasmus

@rasmus People have already solved this problem. There are two options:

  1. implement the outbox pattern
  2. update read models and event persistence in the same transaction

I've been using a streaming db that can update multiple streams in the same transaction. Think ACID streams. So, you actually don't have to do a lot to make option 2 work. However, there are a few databases that support it. Though, by storing events AND read models in SQL Server, this should already work. EventStore doesn't support it though. Nor does Kafka (but you shouldn't really be using Kafka for event sourcing either way).

Btw, I don't think you have to make an explicit choice. It is easier to only support option 2, but as a framework, you probably also would like to support option 1 when people are not using such an ACID (streaming) database.

To support both options, EventFlow probably has to provide the two strategies.

For option 1, Microsoft already has a good framework, namely https://github.com/dotnetcore/CAP. Perhaps we could use it to provide a strategy implementation for option 1.

For option 2, we need to provide an array of events that participate in the transaction. This is currently not designed this way in EventStore. This is probably one of the hardest 'rewrites'.

I would love to implement option 2. I think @mbican would love to implement option 1, but you have to ask him :)

We cannot really move forward without you blessing though :)

promontis avatar Apr 15 '20 18:04 promontis