rabbitmq-stream-dotnet-client copied to clipboard
RabbitMQ client for the stream protocol
RabbitMQ client for the stream protocol
Table of Contents
- Overview
- Installing via NuGet
Getting started
- Connect
- Multi Host
- Load Balancer
- Manage Streams
- Producer
- Publish Messages
- Deduplication
Consume Messages
- Offset Types
- Track Offset
- Handle Close
- Handle Metadata Update
- Heartbeat
- Reliable Producer
- Reliable Consumer
- Build from source
- Project Status
- Release Process
Dotnet client for RabbitMQ Stream Queues
Installing via NuGet
The client is distributed via NuGet.
Getting started
A rapid getting started
var config = new StreamSystemConfig
UserName = "guest",
Password = "guest",
VirtualHost = "/"
// Connect to the broker
var system = await StreamSystem.Create(config);
const string stream = "my_first_stream";
// Create the stream. It is important to put some retention policy
// in this case is 200000 bytes.
await system.CreateStream(new StreamSpec(stream)
MaxLengthBytes = 200000,
var producer = await system.CreateProducer(
new ProducerConfig
Reference = Guid.NewGuid().ToString(),
Stream = stream,
// Here you can receive the messages confirmation
// it means the message is stored on the server
ConfirmHandler = conf =>
Console.WriteLine($"message: {conf.PublishingId} - confirmed");
// Publish the messages and set the publishingId that
// should be sequential
for (ulong i = 0; i < 100; i++)
var message = new Message(Encoding.UTF8.GetBytes($"hello {i}"));
await producer.Send(i, message);
// not mandatory. Just to show the confirmation
// Create a consumer
var consumer = await system.CreateConsumer(
new ConsumerConfig
Reference = Guid.NewGuid().ToString(),
Stream = stream,
// Consume the stream from the beginning
// See also other OffsetSpec
OffsetSpec = new OffsetTypeFirst(),
// Receive the messages
MessageHandler = async (consumer, ctx, message) =>
Console.WriteLine($"message: {Encoding.Default.GetString(message.Data.Contents.ToArray())} - consumed");
await Task.CompletedTask;
Console.WriteLine($"Press to stop");
await producer.Close();
await consumer.Close();
await system.DeleteStream(stream);
await system.Close();
var config = new StreamSystemConfig
UserName = "myuser",
Password = "mypassword",
VirtualHost = "myhost",
Endpoints = new List<EndPoint> {new IPEndPoint(IPAddress.Parse("<<brokerip>>"), 5552)},
Multi Host
var config = new StreamSystemConfig
UserName = "myuser",
Password = "mypassword",
VirtualHost = "myhost",
Endpoints = new List<EndPoint>
new IPEndPoint(IPAddress.Parse("<<brokerip1>>"), 5552),
new IPEndPoint(IPAddress.Parse("<<brokerip2>>"), 5552),
new IPEndPoint(IPAddress.Parse("<<brokerip3>>"), 5552)
var config = new StreamSystemConfig
UserName = "guest",
Password = "guest",
VirtualHost = "/",
Ssl = new SslOption()
Enabled = true
Load Balancer
var lbAddressResolver = new AddressResolver(new IPEndPoint(IPAddress.Parse("<<loadBalancerIP>>"), 5552));
var config = new StreamSystemConfig
UserName = "guest",
Password = "guest",
VirtualHost = "/",
AddressResolver = lbAddressResolver,
Endpoints = new List<EndPoint> {addressResolver.EndPoint},
Manage Streams
await system.CreateStream(new StreamSpec(stream));
is idempotent: trying to re-create a stream with the same name and same properties (e.g. maximum
size) will not throw an exception.
In other words, you can be sure the stream has been created once system.CreateStream
Note it is not possible to create a stream with the same name as an existing stream but with different properties. Such
a request will result in an exception.
It is possible to set up the retention policy when creating the stream, based on size or time:
await system.CreateStream(new StreamSpec(stream)
MaxLengthBytes = 200000,
MaxAge = TimeSpan.FromHours(8),
Set a policy is highly recommended.
RabbitMQ does not store the whole stream in a single file, but splits it in segment files.
This is also used for truncate the stream: when a stream reaches his maximum size, an entire segment file is deleted. For this reason MaxLengthBytes
(the max dimension of the entire stream) is usually significantly higher than MaxSegmentSizeBytes
(the max dimension of a single segment file).
RabbitMQ enforces the retention policy when the current segment has reached its maximum size and is closed in favor of a new one.
await system.CreateStream(new StreamSpec(stream)
MaxLengthBytes = 20_000,
MaxSegmentSizeBytes = 1000
A Producer instance is created from the System
var producer = await system.CreateProducer(
new ProducerConfig
Stream = "my_stream",
Consider a Producer instance like a long-lived object, do not create one to send just one message.
Parameter | Description | Default |
Stream | The stream to publish to. | No default, mandatory setting. |
Reference | The logical name of the producer. | null (no deduplication) |
ClientProvidedName | Set the TCP Client Name | dotnet-stream-producer |
ConfirmHandler | Handler with confirmed messages | It is an event |
ConnectionClosedHandler | Event when the client is disconnected | It is an event |
MaxInFlight | Max Number of messages before send | 1000 |
Producer with a reference name stores the sequence id on the server.
It is possible to retrieve the id using producer.GetLastPublishingId()
or more generic system.QuerySequence("reference", "my_stream")
Publish Messages
Standard publish
var publishingId = 0;
var message = new Message(Encoding.UTF8.GetBytes("hello"));
await producer.Send(publishingId, message);
must be incremented for each send.
Standard Batch publish
Batch send is a synchronous operation. It allows to pre-aggregate messages and send them in a single synchronous call.
var messages = new List<(ulong, Message)>();
for (ulong i = 0; i < 30; i++)
messages.Add((i, new Message(Encoding.UTF8.GetBytes($"batch {i}"))));
await producer.BatchSend(messages);
In most cases, the standard Send
is easier and works in most of the cases.
Sub Entries Batching
A sub-entry is one "slot" in a publishing frame, meaning outbound messages are not only batched in publishing frames, but in sub-entries as well. Use this feature to increase throughput at the cost of increased latency.
var subEntryMessages = List<Messages>();
for (var i = 1; i <= 500; i++)
var message = new Message(Encoding.UTF8.GetBytes($"SubBatchMessage_{i}"));
var publishingId = 1;
await producer.Send(publishingId, subEntryMessages, CompressionType.Gzip);
Not all the compressions are implemented by defaults, to avoid to many dependencies. See the table:
Compression | Description | Provided by client |
CompressionType.None | No compression | yes |
CompressionType.GZip | GZip | yes |
CompressionType.Lz4 | Lz4 | No |
CompressionType.Snappy | Snappy | No |
CompressionType.Zstd | Zstd | No |
You can add missing codecs with StreamCompressionCodecs.RegisterCodec
See Examples/CompressCodecs for Lz4
and Zstd
See here for more details Set a producer reference to enable the deduplication:
var producer = await system.CreateProducer(
new ProducerConfig
Reference = "my_producer",
Stream = "my_stream",
var publishingId = 0;
var message = new Message(Encoding.UTF8.GetBytes($"my deduplicate message {i}"));
await producer.Send(publishingId, message);
Consume Messages
Define a consumer:
var consumer = await system.CreateConsumer(
new ConsumerConfig
Reference = "my_consumer",
Stream = stream,
MessageHandler = async (consumer, ctx, message) =>
$"message: {Encoding.Default.GetString(message.Data.Contents.ToArray())}");
await Task.CompletedTask;
Offset Types
There are five types of Offset and they can be set by the ConsumerConfig.OffsetSpec
property that must be passed to the Consumer constructor, in the example we use OffsetTypeFirst
var consumerOffsetTypeFirst = await system.CreateConsumer(
new ConsumerConfig
Reference = "my_consumer_offset_first",
Stream = stream,
OffsetSpec = new OffsetTypeFirst(),
MessageHandler = async (consumer, ctx, message) =>
await Task.CompletedTask;
The five types are:
- First: it takes messages from the first message of the stream.
var offsetTypeFirst = new OffsetTypeFirst();
- Last: it takes messages from the last chunk of the stream, i.e. it doesn’t start from the last message, but the last “group” of messages.
var offsetTypeLast = new OffsetTypeLast();
- Next: it takes messages published after the consumer connection.
var offsetTypeNext = new OffsetTypeNext()
- Offset: it takes messages starting from the message with id equal to the passed value. If the value is less than the first message of the stream, it starts from the first (i.e. if you pass 0, but the stream starts from 10, it starts from 10). If the message with the id hasn’t yet been published it waits until this publishingId is reached.
ulong iWantToStartFromPubId = 10;
var offsetTypeOffset = new OffsetTypeOffset(iWantToStartFromPubId);
- Timestamp: it takes messages starting from the first message with timestamp bigger than the one passed
var anHourAgo = (long)DateTime.UtcNow.AddHours(-1).Subtract(new DateTime(1970, 1, 1)).TotalSeconds;
var offsetTypeTimestamp = new OffsetTypeTimestamp(anHourAgo);
Track Offset
The server can store the current delivered offset given a consumer with StoreOffset
in this way:
var messagesConsumed = 0;
var consumer = await system.CreateConsumer(
new ConsumerConfig
Reference = "my_consumer",
Stream = stream,
MessageHandler = async (consumer, ctx, message) =>
if (++messagesConsumed % 1000 == 0)
await consumer.StoreOffset(ctx.Offset);
Note: Avoid storing the offset for every single message, it can reduce performance.
It is possible to retrieve the offset with QueryOffset
var trackedOffset = await system.QueryOffset("my_consumer", stream);
var consumer = await system.CreateConsumer(
new ConsumerConfig
Reference = "my_consumer",
Stream = stream,
OffsetSpec = new OffsetTypeOffset(trackedOffset),
Note: if you try to store an offset that doesn't exist yet for the consumer's reference on the stream you get will get an OffsetNotFoundException
Handle Close
Producers/Consumers raise and event when the client is disconnected:
new ProducerConfig/ConsumerConfig
ConnectionClosedHandler = s =>
Console.WriteLine($"Connection Closed: {s}");
return Task.CompletedTask;
Handle Metadata Update
Stream metadata update is raised when the stream topology changes or the stream is deleted.
You can use MetadataHandler
to handle it:
new ProducerConfig/ConsumerConfig
MetadataHandler = update =>
It is possible to configure the heartbeat using:
var config = new StreamSystemConfig()
Heartbeat = TimeSpan.FromSeconds(30),
) seconds is the default value -
) will advise server to disable heartbeat
Heartbeat value shouldn't be too low.
- Reliable Producer
- Reliable Consumer
See the directory Examples/Reliable for code examples.
Reliable Producer
Reliable Producer is a smart layer built up of the standard Producer
The idea is to give the user ability to choose between the standard or reliable producer.
The main features are:
- Provide publishingID automatically
- Auto-Reconnect in case of disconnection
- Trace sent and received messages
- Invalidate messages
- Handle the metadata Update
Provide publishingID automatically
Reliable Producer retrieves the last publishingID given the producer name.
Zero(0) is the default value in case there is no publishingID for given producer reference.
Reliable Producer restores the TCP connection in case the Producer is disconnected for some reason. During the reconnection it continues to store the messages in a local-list. The user will receive back the confirmed or un-confirmed messages. See Reconnection Strategy
Trace sent and received messages
Reliable Producer keeps in memory each sent message and removes it from the memory when the message is confirmed or times out.
receives the messages back with the status.
can have different values, but in general ConfirmationStatus.Confirmed
means the messages
is stored on the server. Other statuses mean that there was a problem with the message/messages under given publishing id.
ConfirmationHandler = confirmation =>
if (confirmation.Status == ConfirmationStatus.Confirmed)
// OK
// Some problem
Currently defined confirmation statuses
Status | Description | Source |
Confirmed | Message has been confirmed by the server and written to disk. | Server |
ClientTimeoutError | Client gave up waiting for the message (read more here). | Client |
StreamNotAvailable | Stream was deleted or otherwise become unavailable. | Server |
InternalError | Server | |
AccessRefused | Provided credentials are invalid or you lack permissions for specific vhost/etc. | Server |
PreconditionFailed | Catch-all for validation on server (eg. requested to create stream with different parameters but same name). | Server |
PublisherDoesNotExist | Server | |
UndefinedError | Catch-all for any new status that is not yet handled in the library. | Server |
Invalidate messages
If the client doesn't receive a confirmation within configured timeout (3 seconds by default), Reliable Producer removes the message from the internal messages cache.
The user will receive ConfirmationStatus.ClientTimeoutError
in the ConfirmationHandler
Send API
Reliable Producer implements two send(..)
Send(Message message)
// standard -
Send(List<Message> messages, CompressionType compressionType)
//sub-batching with compression
Reliable Consumer
Reliable Consumer is a smart layer built up of the standard Consumer
The idea is to leave the user decides what to use, the standard or reliable Consumer.
The main features are:
- Auto-Reconnect in case of disconnection
- Auto restart consuming from the last offset
- Handle the metadata Update
Reliable Consumer restores the TCP connection in case the Producer is disconnected for some reason. Reliable Consumer will restart consuming from the last offset stored. See Reconnection Strategy
Reconnection Strategy
By default Reliable Producer/Consumer uses an BackOffReconnectStrategy
to reconnect the client.
You can customize the behaviour implementing the IReconnectStrategy
bool WhenDisconnected(string connectionInfo);
void WhenConnected(string connectionInfo);
If WhenDisconnected
return is true
Producer/Consumer will be reconnected else closed.
add information about the connection.
You can use it:
var p = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig()
ReconnectStrategy = MyReconnectStrategy
Reliable handle metadata update
If the streams changes the topology (ex:Stream deleted or add/remove follower), the client receives an MetadataUpdate
Reliable Producer detects the event and tries to reconnect the producer if the stream still exist else closes the producer/consumer.
Build from source
make build
make test
Run test in docker:
make run-test-in-docker
Project Status
The client is work in progress. The API(s) could change prior to version 1.0.0
Release Process
- Ensure builds are green: link
- Tag the
branch using your GPG key:git tag -a -s -u GPG_KEY_ID -m 'rabbitmq-stream-dotnet-client v1.0.0-beta.6' 'v1.0.0-beta.6' && git push && git push --tags
- Ensure the build for the tag passes: link
- Check for the new version on NuGet: link
- Best practice is to download the new package and inspect the contents using NuGetPackageExplorer
- Create the new release on GitHub: link
- Announce the new release on the mailing list: link