Rebus icon indicating copy to clipboard operation
Rebus copied to clipboard

Outbox pattern support

Open kewinbrand opened this issue 5 years ago • 60 comments

Hey

Have you ever considered implementing outbox pattern in Rebus? I'm willing to help if necessary =)

Cheers

kewinbrand avatar Aug 02 '19 18:08 kewinbrand

It's definitely something I've considered, yes. 😄

Do you need it?

mookid8000 avatar Aug 02 '19 21:08 mookid8000

https://vimeo.com/111998645 This video gives a good overview of things to take into account with the outbox pattern and the level of durability it can provide for messages. The ability to ensure business critical messages are queued and consumed, and consumed only the expected number of times, is very powerful. I am in the process of evaluating Rebus, Mass Transit, and NServiceBus, and I am still at the point of not knowing what I dont know so its very possible I don't need outbox and there are other ways to accomplish the level of reliability we will need, but on first impression it feels like this is something I might need.

I would be curious to know how people are handling mission critical messages and ensuring everything works as expected currently? One of the workloads (not the first workload we are planning on leveraging messaging for , but one on the list) we are looking to begin using messaging for has some serious financial ramifications for our customers and the company if we lose messages or consume messages multiple times.

MattMinke avatar Aug 02 '19 22:08 MattMinke

This is definitely one of the patterns, i'd love to see in this library. The transaction standpoint of Rebus is of course very valid (idempotency all the way), but it's not always possible or trivial. A client i'm at right now has a service handing out unique keys, so that's kind of a hassle to get idempotent. Especially since they need to get tracked. The outbox pattern would help a great deal to get this done, without having to change the model.

Tsjunne avatar Aug 10 '19 03:08 Tsjunne

Just a though: I don't think adding outbox support to Rebus would be so hard, and I don't even think it would require any kind of changes to any of the existing libraries.

An outbox for Rebus could work by

  • decorating Rebus' ITransport, saving all of the outgoing messages in the outbox (possible using the message ID of the message currently being handled as a key, or simply in a keyless store-and-forward fashion)
  • adding an incoming step in the receive pipeline that checks the outbox for messages with a key matching the received message ID

and possibly more.

These two hooks – decorating ITransport and adding an incoming pipeline step – are well-documented extension points, and they're fairly easy to understand and use.

mookid8000 avatar Nov 08 '19 12:11 mookid8000

I would also love to see this Outbox feature.

In my mind it's similar to the IdempotentSaga feature (https://github.com/rebus-org/Rebus/wiki/Idempotence), where the IdempotentSaga acts as the Outbox. So implementing Outbox can be a copy/paste/slimmed down version of Sagas+IdempotentSaga I think.

Thinking further on this. If Outbox and Saga were persisted transactionally then a separate IdempotentSaga concept would no longer be needed. Also not having to load the entire saga state just for checking if the message is a redelivery could benefit performance.

jr01 avatar Dec 16 '19 09:12 jr01

Could this be potentially implemented as a a decorated ITransport that just sent out a Deferred message with current timestamp using the an ITimeoutManager? Something like:

    public Task Send(string destinationAddress, TransportMessage message, ITransactionContext context)
    {
        if (message.Headers.ContainsKey(Headers.DeferredUntil))
        {
            message.Headers.Remove(Headers.DeferredUntil);
            message.Headers.Remove(Headers.DeferredRecipient);
            return internalTransport.Send(destinationAddress, message, context);
        }
        else
        {
            var deliveryTime = DateTimeOffset.UtcNow;
            message.SetDeferHeaders(deliveryTime, destinationAddress);
            return timeoutManager.Defer(deliveryTime, message.Headers, message.Body);
        }
    }

I was hoping this would be trivial to implement and I could leverage existing supported database timeout storage and leverage the existing timeout manager pipeline step to do the real outbound transport send. The only hitch I ran into has been that it doesn't seem possible to easily have the timeout manager participate/share the database connection and transaction with the same connection/transaction that my handlers are using.

dtabuenc avatar Apr 21 '20 06:04 dtabuenc

Definitely an interesting idea.... but, if the outbox feature should be good, I think it will be necessary to design it from scratch.

One of the most important things to consider, which you've also discovered, is how the user's database connection + transaction is shared with the outbox. This part should be super-neat, but it should also be extensible – it should at least be possible for SQL Server, Postgres, and Oracle to be used as outboxes, but it would also be neat to be able to use RavenDB and MongoDB (which has had multi-document ACID transactions since version 4).

mookid8000 avatar Apr 21 '20 07:04 mookid8000

I've spent some time a while ago trying to find the necessary extension points. I think this would need some changes in the base package. From what i can see, the implementation shouldn't be much different from the idempotent saga implementation, with the difference being that the outbox should be a separate persistence model, instead of embedding pending messages in the saga store.

In that respect, a message being handled with outbox enabled would have some kind of system generic saga instance that would, in effect, be the outbox record.

Tsjunne avatar Apr 21 '20 14:04 Tsjunne

@mookid8000 So in the post you made in November you mentioned a two-part solution. A decorated transports that stores the outgoing messsages + incoming step. I assume the incoming step with msg id check is for incoming message deduplication (which I see as a separate feature from outbox store-and-forward). If I just wanted to do outbox store-and-forward, I imagine there would be a background worker thread that just reads in messages from storage and forwards them on to the underlying real transport. Where do you imagine is the best place to launch or manage this background thread? Or do you think there might be a better way to go about this?

dtabuenc avatar Apr 21 '20 16:04 dtabuenc

I still think there is a case to be made that there should be some way to make timeout managers share connection and transaction with transports that share underlying transactional technologies. In the case of outbox pattern, deferred messages also need to participate in the outbox transaction so that they don't go out if underlying business transaction fails. I'm not sure how I would implement this requirement without duplicating a lot of timeout manager code. Essentially if a timeout manager could share connection and transaction, then outbox would simply become a defer with immediate send time.

dtabuenc avatar Apr 21 '20 16:04 dtabuenc

When you

await bus.Defer(delay, message);

the message does not get saved immediately to the timeout storage – it gets the appropriate headers, and then it gets sent to "the timeout manager".

By default, "the timeout manager" will be the sender's own input queue, but it could also be an external timeout manager (if one is configured).

Therefore, if an outbox was implemented, timeouts would be subject to the same outgoing messages guarantees as all other outgoing messages.

mookid8000 avatar Apr 22 '20 11:04 mookid8000

In writing my background task that forwards outbox->transport I'm running into the issue of how to create a TransactionContext. I'm using DispatchIncomingMessageStep as an example.. and there it simply news one up. However, I can not simply use that because it is internal and so I can not new it up in my code. Is there a reason why TransactionContext class can not be made public, or alternative a way to query an ITransactionContext from the container? In the meantime I've just copied the class into my project, but would love to get some thoughts on this.

dtabuenc avatar Apr 22 '20 18:04 dtabuenc

You can create a scoped transaction context like this:

using (var scope = new RebusTransactionScope())
{
    //          here it is 👇
    var context = scope.TransactionContext;

    await scope.CompleteAsync();
}

mookid8000 avatar Apr 22 '20 18:04 mookid8000

I don't get this ... Transactional outbox is a pattern for getting an atomic data change and event send operation. Isn't that what you get by using the RebusTransactionScope, or configure and code a solution where Rebus transport and Db back-end share connection and transaction (ie. TransactionScope for Sql server)? Why would you need something in addition?

Can someone please enlighten me as I must have misunderstood some concepts in Rebus.

pi3k14 avatar Apr 28 '20 07:04 pi3k14

What is Rebus' transaction scope good for?

RebusTransactionScope helps you bundle and delay bus operations:

using (var scope = new RebusTransactionScope())
{
	await bus.Send(someCommand);

	await bus.Send(anotherCommand);

	// time goes by - nothing has actually been sent yet
	//
	// more time goes by
	//
	// NOW it will be sent
	await scope.CompleteAsync();
}

For instance, if you're in a web application, you can use a RebusTransactionScope in your OWIN/MVC Core pipeline to delay bus operations until AFTER you've successfully committed your database work and generated a response for the client.

Since bus operations are generally MUCH less likely to fail, this design is really great in many scenarios.

Also, many transports don't support any kind of atomic transaction around multiple send/publish operations. In fact, I think only MSMQ can do that... and then of course if you're using one of RDBMSs, SQL Server, PostgreSQL, or Oracle, as the transport.

How can it integrate with System.Transactions.TransactionScope?

It can't.

Not even if you are using SQL Server as the transport.

At best, it would be able to enlist in a distributed transaction, which your db work transaction would also be part of, but the transactions would be distinct and would come with all of the annoying issues that distributed tranactions bring with them.

How would an outbox for Rebus work?

By somehow configuring Rebus to use an outbox:

Configure.With(...)
	.(...)
	.Outbox(o => o.(...))
	.Start();

BUT the hard part would be to provide a sensible API for sharing a database connection and its ongoing transaction between your code and Rebus' transport.

The usual Rebus way would be something like

Configure.With(...)
	.(...)
	.Outbox(o => o.UseSqlServer(what?))
	.Start();

or

Configure.With(...)
	.(...)
	.Outbox(o => o.UseEntityFramework(what?))
	.Start();

but my imagination doesn't know what to put in what?'s place above, which would enable this.

Maybe it would be a requirement that you use Rebus.UnitOfWork?

Because then the configuration API could force you to somehow hook up the necessary connection and transaction stuff with Rebus in a way that makes it available both to Rebus' outbox and to your application, most likely via handler injection.

mookid8000 avatar Apr 28 '20 12:04 mookid8000

@mookid8000 Thank you for your detailed explanation, I got mixed up by Rebus.Transport.RebusTransactionScope class and Rebus.TransactionScope namespace :)

But still, I think the solution should be based around System.Transactions.TransactionScope and maybe some injected connection factory.

Outbox has to be implemented by a transaction based store used by both transport and data. In my imagination :) there shouldn't be an outbox configuration, but a variation of the transport configuration. If you need some other kind of message delivery it should be configured as a duplicated message bus that reads from the transaction based store and publish to RabbitMQ or whatever.

pi3k14 avatar Apr 28 '20 13:04 pi3k14

Currently, we have a high need for outbox to be able to retrofit an application that used to use MSMQ and distributed transactions with rebus and a custom Kafka transport and outbox. The way we wrote it was something like:

                .Transport(t =>
                {
                    t.UseKafka("txtest", kafkaConfig);
                    t.WithOutbox();
                    t.WithPostgresOutboxStore("outbox",  someConnectionProvider);
                })

We pass a connection provider to the "PostgresOutbox" and use HandleMessageInsideTransactionScope() to handle the shared transactions between our handlers and the outbox send.

The outbox transport just decorates the underlying transport, replacing the sends with a save to the outbox and passing through receives as-is. We get the connection from our container (Autofac) but it's sort of a hack since we just look it up from the rebus transaction context by the hard-coded key that the autofac container adapter is using to store it in. If we are not in a rebus transaction, we just new up a new connection. Now that you mention unit of work, it may be possible to expose the container scope through that?

The last issue I had, is that we want to be able to have sagas participate in the same database transaction (postgres) as our outbox and our application code. Seems like it would be easy to do we just told the saga code not to do anything with transaction and let the System.Transaction.TransactionScope deal with it. The CustomPostgresConnectionProvider included in Rebus.PostgresSql has the option to disable autoStartTransactions but it is useless because the underlying PostgresConnection will throw if not given a transaction. Is this by design? It's weird because the code doesn't make sense:

        public async Task<PostgresConnection> GetConnection()
        {
            var connection = await _provideConnection();
            var transaction = _autoStartTransactions ? connection.BeginTransaction(IsolationLevel.ReadCommitted) : null;
            return new PostgresConnection(connection, transaction);
        }

and then:

     public PostgresConnection(NpgsqlConnection currentConnection, NpgsqlTransaction currentTransaction)
        {
            _currentConnection = currentConnection ?? throw new ArgumentNullException(nameof(currentConnection));
            _currentTransaction = currentTransaction ?? throw new ArgumentNullException(nameof(currentTransaction));
        }

Would it be possible to change this to have it not manage transactions and just let a pipeline step wrap that in a system TransactionScope??

dtabuenc avatar Apr 28 '20 16:04 dtabuenc

I've built an outbox before for a producer that published messages to RabbitMQ, but I used SQLite, rather than an out-of-process database like Postgres or SQL Server.

You won't get quite the same guarantees of course, since if the producer's disk is borked then the outboxed messages are lost.

But a high-availability setup for outboxed messages is overkill for most scenarios (and what's the chance of the broker being down and the consumer disk breaking at the same time?), and would come with a hefty performance penalty, whereas the penalty is much smaller with something like SQLite.

cocowalla avatar Jul 05 '20 13:07 cocowalla

@cocowalla What you are describing is simple store-and-forward. The outbox pattern is a more specialized versions of this who's purpose is not so much protecting us if the broker is down, but rather providing transactional guarantees. It is a replacement or alternative to distributed transactions, Thus the outbox pattern relies on persisting the message in the same transaction and database that is being used by the message handler to transact the business logic.

dtabuenc avatar Jul 05 '20 14:07 dtabuenc

Following up on my earlier comments, outbox with SQL server for a publish only service can be done like this:

// create some connection accessor
public interface ISqlConnectionAccessor
{
    SqlConnection Connection { get; }
}

// create a connection factory for Rebus, where connection is handled external
Task<IDbConnection> connectionFactory() => Task.Run(() => (IDbConnection)new DbConnectionWrapper(sqlConnectionAccessor.Connection, null, true));

// configure bus
 obj.AddRebus(configure => configure
     .Transport(t => t.UseSqlServerAsOneWayClient(new SqlServerTransportOptions(connectionFactory)))
     .Subscriptions(s => s.StoreInSqlServer(connectionFactory, subscriptionTableName, isCentralized: true, automaticallyCreateTables: false))


// create a System.Transaction.TransactionScope
using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);

// alter your data  
SqlCommand command = sqlConnectionAccessor.Connection.CreateCommand();
command.CommandText = "whatever";
command.ExecuteNonQuery();

// publish message
bus.Advanced.Topics.Publish(topic, msg)

// complete transaction
scope.Complete();

Not completely tested, but looking good. A pub/sub with leased transport gets more complicated, but we are looking into it :)

pi3k14 avatar Jul 20 '20 08:07 pi3k14

Hey, i have tried @pi3k14 solution and it worked for me. I have successfully "pushed" the connection and transaction from our application. Know i need to do the reverse - reuse the sqlConnection, initiated from rebus, in our app. i can access the TransactionContext and from there - DbConnectionWrapper The problem is that i cannot access the underling connection of the driver. For now i have inherited the DbConnectionWrapper and exposed the connection but is a hack after all :( Is there a more clever solution ?

agerchev avatar Sep 03 '20 10:09 agerchev

Hi @agerchev, our take on this is to implement our own version of IDbConnectionProvider that also returns the connection. The implementation gets or adds the actual SqlConnection to rebus AmbientTransactionContext.Current.Items.

pi3k14 avatar Sep 03 '20 10:09 pi3k14

Yes, i have implemented IDbConnectionProvider. The point is that i have to reimplement the DbConnectionWrapper too or do this:

public class RebusConnectionWrapper : DbConnectionWrapper
    {
        public SqlConnection Connection { get; private set; }
        public SqlTransaction Transaction { get; private set; }

        public RebusConnectionWrapper(SqlConnection connection, SqlTransaction currentTransaction, bool managedExternally) :
            base(connection, currentTransaction, managedExternally)
        {
            Connection = connection;
            Transaction = currentTransaction;
        }
    }

so i can access the sqlConnection again.

agerchev avatar Sep 03 '20 10:09 agerchev

As long as your IDbConnectionProvider implementation caches its connection in AmbientTransactionContext.Current.Items you don't have to do that. Your implementation would have public Task<IDbConnection> GetConnection(), which is for Rebus (returning an DbConnectionWrapper), and public SqlConnection CreateConnection(), which is for you. If these find a cached connection that is just returned instead of creating a new one. You would set the managedExternally flag in the wrapper based on who does the creation.

pi3k14 avatar Sep 03 '20 11:09 pi3k14

The problem of sharing a common transaction context by reusing the same connection instance is a typical one. I used a couple of helper classes for that at work (DbConnectionScope and LocalTransactionScope) and after reading this thread decided to share them before submitting a PR for an Outbox abstraction for Rebus. Feel free to use as is or modify to your needs.

LocalTransactionScope.

rsivanov avatar Sep 03 '20 13:09 rsivanov

I made a draft of an outbox abstraction for Rebus in a separate repository to have some starting point for later improvement.

IOutboxStorage defines methods to store and retrieve messages. A possible implementation for Sql Server should take a Func<Task<IDbConnection>> connectionFactory as a configuration parameter the same way as SqlServer transport. To share a common connection instance in an application we could use something like DbConnectionScope to implement that connectionFactory function.

OutboxConfigurationExtensions contains an extension method Outbox, that allows to configure, whether to run a background task for sending stored outbox messages. It could be turned off to allow a separate host to do a centralized processing of outbox messages or to use some other means of delivering outbox messages (e.g. Debezium).

Ideally an outbox abstraction should be included in Rebus repository to simplify dependencies management. That would allow to implement IOutboxStorage in Rebus.SqlServer repository.

rsivanov avatar Sep 04 '20 07:09 rsivanov

I haven't dug deep into this for a while, so hopefully I'm not writing non-sense.

Sharing a transaction between Rebus and the actual work performed is the "easy" solution but isn't truly the Outbox pattern. In this mode, consuming the incoming message, queuing new outgoing messages and performing work all happen atomically, which is nice and avoids problems altogether. (go for it if you can)

The outbox pattern is meant for cases where sharing this transaction is not possible. E.g. heterogenous DBs without distributed transactions, file system access, etc. It requires saving new outgoing messages in a temporary Outbox, which should be (somehow) transactional with the actual business work. When an incoming message is handled a second time, the process should be idempotent and associated messages in Outbox should be sent.

There's a fair bit of book-keeping so I agree @rsivanov that in an ideal world Rebus would handle most of it. There are design decisions to be taken (e.g. where is the outbox persisted? who handles duplicate messages?). I think the design should favor ease of consumption (i.e. Rebus should do as much of the heavy-lifting as possible).

Here's a proposal that requires the introduction of two new API for the handler:

  1. New handler must determine a unique id (string, so that anything meaningful for business can be used here: entity id, multiple serialized attributes, guid) for the business of handling this message.
  2. New this id is communicated with a call to BeginOutbox(id: "something-repeatable")
  3. Handler performs some work, which should be idempotent. 3.a. So if the message was a duplicate, id above should have been the same and nothing should actually be done. 3.b. If the message was new, work is performed, including some Send(), which Rebus would persist in the Outbox, associated to the handler id.
  4. When handler is done, it should commit its work. 4.a. New First it should call CommitOutbox(), which does what the name says: it persists the outbox messages inside the Rebus storage. 4.b. Then it should commit its own work and return.
  5. Having successfully returned, Rebus will mark the incoming message as handled and send outbox messages atomically. Outbox messages are identified with the handler id. They have just been created for a new message; they were there from a previous execution in a duplicate message.

Possible crashes:

  • Before 4.a. nothing is committed, everything will be redone.
  • Before 4.b. only outbox messages have been committed. Rebus should clean them up on crash. (Here there's some hidden complexity: what if it can't, e.g. the storage is not accessible anymore? These messages imperatively need to be cleaned up on recovery, so that they won't be duplicated by the next retry. There are several designs possible here, the simplest could be passing a second duplicate: true parameter to BeginOutbox.)
  • Before 5: Rebus will retry message processing and because everything was committed, nothing will be done except sending the messages already in outbox from previous execution.
  • After 5: everything was done and persisted, we don't care!

As for storage, maybe we don't need a new interfaces at all? Couldn't those messages be saved in the existing queues, with a different status? Or maybe they could be put in a special outbox queue, similar to the poison or audit queues design?

jods4 avatar Sep 06 '20 22:09 jods4

@jods4, I think you put to much into what a Transactional outbox "is allowed to be". You can find a definition here, the main point being - not use distributed transactions. If using Sql server both for entity storage and Rebus transport it would be "stupid" not fuse that into a single operation instead of adding an extra Outbox to your entity storage and an extra service to relay between Outbox and bus. In addition it is only necessary with an Outbox for pure publishing. Inside event handling that is not necessary, as you would rely on "at-least-once" delivery and idempotent operations. You don't need Outbox for reading queued events.

pi3k14 avatar Sep 07 '20 05:09 pi3k14

"If using Sql server both for entity storage and Rebus transport it would be "stupid" not fuse that into a single operation instead of adding an extra Outbox to your entity storage and an extra service to relay between Outbox and bus."

That's right, when you use Rebus with SqlServer transport, you don't need an Outbox, you just need to share a local transaction between SqlServer transport and your business logic. That's what Func<Task<SqlConnection>> connectionFactory allows you to do.

You need to use Outbox only when your transport doesn't support a local transaction with your code - such as RabbitMq for example. In that case you need a local outbox storage in the same database that your code works with, and that's what IOutboxStorage abstraction is all about.

rsivanov avatar Sep 07 '20 08:09 rsivanov

I think that @jods4 maybe be right about the interfaces. Afterall we all need a transactional commit with the rest of our application data. This can be done with a characteristic of the transport as @pi3k14 has shown . Unfortunatelly not all of the db transports support it. If they can follow the same pattern as mssql transport it would be very helpfull(or there be some generic implementation and just plug the command creation for the specific commands : create queue, send, receive ....) Then it needs to be implemented just the forwarding from one transport (db in the case acting as the outbox ) to the other(it can be rabbitmq or other software afterall). In this case every transport can be used as outbox store if it is approptiate and the tx/connection can be reused ofcourse.

@pi3k14 why to cache the connection when this is done by the transport. I think it is much simpler just to access it, it is allready there and it is supposed to be accessed(as described in the comment of the constant in sqltransport class )

agerchev avatar Sep 07 '20 08:09 agerchev