MassTransit.EventStoreIntegration icon indicating copy to clipboard operation
MassTransit.EventStoreIntegration copied to clipboard

EventStore persistence for MassTransit


EventStore audit store for MassTransit



The library is published on

Install using the dotnet CLI command or your IDE:

dotnet add package MassTransit.EventStoreIntegration


This library provides two types of persistence:

  • Audit persistence
  • Event-sourced saga persistence


To use EventStore as audit store in MassTransit, use the following code:

// initialise ES connection
var eventStoreConnection = EventStoreConnection.Create(connectionString);
await eventStoreConnection.ConnectAsync();

// initialise the store
var store = new EventStoreMessageAudit(eventStoreConnection, "audit-stream-name");

// configure the bus
var bus = ....;

await bus.StartAsync();

Hence that ConnectSendAuditObservers connects both sending and publishing observers.

Event-Sourced Saga

You can make your saga instances event-sourced and use EventStore to persist them. This will allow you to see the full history of your processed, which is great for tracking and tracing, also gives you a complete audit log for your workflows.

First, you need to make your instances event-sourced. This requires that all state changes are done as event handlers. Here is an example of such instance class:

public class SampleInstance : EventSourcedSagaInstance, SagaStateMachineInstance
    public SampleInstance(Guid correlationId) : this()
        CorrelationId = correlationId;

    private SampleInstance()
        Register<ProcessStarted>(x => OrderId = x.OrderId);
        Register<OrderStatusChanged>(x => OrderStatus = x.OrderStatus);

    public string OrderStatus { get; private set; }
    public string OrderId { get; private set; }

As you can see, at least two things need to be done:

  • Inherit your instance from the EventSourcedSagaInstance abstract class
  • Register event handlers for all state changes

The base class provides the default InitialState property of type string, so you do not need to add it.

Sample state machine, which uses this instance, looks like this:

public class SampleStateMachine : MassTransitStateMachine<SampleInstance>
    public SampleStateMachine()
        InstanceState(x => x.CurrentState);

        Event(() => Started,
            x => x.CorrelateById(e => e.Message.CorrelationId).SelectId(e => e.Message.CorrelationId));
        Event(() => Stopped, x => x.CorrelateById(e => e.Message.CorrelationId));
        Event(() => StatusChanged, x => x.CorrelateById(e => e.Message.CorrelationId));

                .Then(c => c.Instance.Apply(c.Data))

                .Then(c => c.Instance.Apply(c.Data)),

    public State Running { get; private set; }
    public State Done { get; private set; }
    public Event<ProcessStarted> Started { get; private set; }
    public Event<ProcessStopped> Stopped { get; private set; }
    public Event<OrderStatusChanged> StatusChanged { get; private set; }

You can see that in each When we need to call the ApplyChange method to trigger the instance event handler, which changes the instance state. All changes are then saved as separate events.

Keep in mind that state transitions are recorded implicitly as separate events of type SagaInstanceTransitioned. Those events don't need to be explicitly handled, it happens automatically.

The last step is to tell MassTransit to use the EventStore repository:

var connection = EventStoreConnection.Create(connectionString);
var repository = new EventStoreSagaRepository<SampleInstance>(connection);

var bus = Bus.Factory.CreateUsingRabbitMq(c =>
        c.Host(new Uri("rabbitmq://localhost"), h =>

        var machine = new SampleStateMachine();
            ep => ep.StateMachineSaga(machine, repository));

Working sample is available in this repository, in the MassTransit.EventStoreIntegration.Sample project.

The full lifecycle of this saga is then looks like this in the EventStore:

alt text