MetroBus copied to clipboard
Lightweight messaging wrapper of MassTransit
Lightweight messaging wrapper of MassTransit
NuGet Packages
> dotnet add package MetroBus
> dotnet add package MetroBus.Autofac
> dotnet add package MetroBus.Microsoft.Extensions.DependencyInjection
- .NET Standard 2.0
- Currently only supports RabbitMQ transport
- Provides easy way to create Producer and Consumer for Pub/Sub
- Provides easy way to handle Request/Response conversations
- Provides message scheduling
- Includes optional incremental auto retry policy
- Includes optional circuit breaker
- Includes optional rate limiter
- Autofac support
- Microsoft.Extensions.DependencyInjection support
Initializing bus instance for Producer:
// For events
IBusControl bus = MetroBusInitializer.Instance.UseRabbitMq(rabbitMqUri, rabbitMqUserName, rabbitMqPassword)
// For commands
ISendEndpoint bus = MetroBusInitializer.Instance.UseRabbitMq(rabbitMqUri, rabbitMqUserName, rabbitMqPassword)
after bus instance initializing then you can use Send or Publish methods.
// For events
await bus.Publish<TEvent>(new
SomeProperty = SomeValue
// For commands
await bus.Send<TCommand>(new
SomeProperty = SomeValue
using Consumer:
static void Main(string[] args)
IBusControl bus = MetroBusInitializer.Instance
.UseRabbitMq(string rabbitMqUri, string rabbitMqUserName, string rabbitMqPassword)
.RegisterConsumer<TCommandConsumer>(string queueName)
.RegisterConsumer<TEventConsumer>(string queueName)
//if you want to stop
TCommandConsumer could like below:
public class TCommandConsumer : IConsumer<TCommand>
public async Task Consume(ConsumeContext<TCommand> context)
var command = context.Message;
//do something...
await Console.Out.WriteAsync($"{command.SomeProperty}");
Initializing bus instance for Request/Response conversation:
IRequestClient<TRequest, TResponse> client = MetroBusInitializer.Instance.UseRabbitMq(string rabbitMqUri, string rabbitMqUserName, string rabbitMqPassword)
.InitializeRequestClient<TRequest, TResponse>(string queueName);
TResponse result = await client.Request(new TRequest
Command = "Say hello!"
and consumer for Request/Response conversation could like below:
public class TCommandConsumer : IConsumer<TRequest>
public async Task Consume(ConsumeContext<TRequest> context)
var command = context.Message;
//do something...
await Console.Out.WriteAsync($"{command.SomeProperty}");
context.Respond(new TRequest
Command = "Hello!"
using Consumer with Microsoft.Extensions.DependencyInjection:
new HostBuilder ()
.ConfigureServices ((hostContext, services) =>
services.AddMetroBus (x =>
services.AddSingleton<IBusControl> (provider => MetroBusInitializer.Instance
.UseRabbitMq (string rabbitMqUri, string rabbitMqUserName, string rabbitMqPassword)
.RegisterConsumer<TCommandConsumer>("foo.command.queue", provider)
.RegisterConsumer<TEventConsumer>("foo.event.queue", provider)
.Build ())
.BuildServiceProvider ();
services.AddHostedService<BusService> ();
.RunConsoleAsync ().Wait ();
public class BusService : IHostedService
private readonly IBusControl _busControl;
public BusService(IBusControl busControl)
_busControl = busControl;
public Task StartAsync(CancellationToken cancellationToken)
return _busControl.StartAsync(cancellationToken);
public Task StopAsync(CancellationToken cancellationToken)
return _busControl.StopAsync(cancellationToken);
PS: Publisher and Consumer services must be used same TCommand or TEvent interfaces. This is important for MassTransit integration. Also one other thing is rabbitMqUri parameter must start with "rabbitmq://" prefix.
There are several options you can set via fluent interface:
.UseRetryPolicy().UseIncrementalRetryPolicy(int retryLimit, TimeSpan? initialIntervalTime, TimeSpan? intervalIncrementTime, params Exception[] retryOnSpecificExceptionType).Then()
.UseCircuitBreaker(int tripThreshold, int activeThreshold, TimeSpan? resetInterval)
.UseRateLimiter(int rateLimit, TimeSpan? interval)
.UseConcurrentConsumerLimit(int concurrencyLimit)