rabbitmq-stream-dotnet-client
rabbitmq-stream-dotnet-client copied to clipboard
Work in progress Single Active Consumer
Work on this feature with @Zerpet
Signed-off-by: Gabriele Santomaggio [email protected]
Closes https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/issues/146
Note: SacTests
will fail until the 3.11 is released
Codecov Report
Base: 92.45% // Head: 92.42% // Decreases project coverage by -0.03%
:warning:
Coverage data is based on head (
7a1b408
) compared to base (dae7e12
). Patch coverage: 92.61% of modified lines in pull request are covered.
Additional details and impacted files
@@ Coverage Diff @@
## main #152 +/- ##
==========================================
- Coverage 92.45% 92.42% -0.04%
==========================================
Files 78 89 +11
Lines 6220 7138 +918
Branches 385 445 +60
==========================================
+ Hits 5751 6597 +846
- Misses 380 439 +59
- Partials 89 102 +13
Impacted Files | Coverage Δ | |
---|---|---|
RabbitMQ.Stream.Client/Reliable/ReliableBase.cs | 100.00% <ø> (ø) |
|
RabbitMQ.Stream.Client/Hash/Extensions.cs | 31.25% <31.25%> (ø) |
|
Tests/Utils.cs | 72.82% <58.33%> (-4.54%) |
:arrow_down: |
RabbitMQ.Stream.Client/Producer.cs | 79.31% <75.00%> (+1.36%) |
:arrow_up: |
RabbitMQ.Stream.Client/StreamSystem.cs | 89.37% <75.86%> (-2.39%) |
:arrow_down: |
...bitMQ.Stream.Client/ConsumerUpdateQueryResponse.cs | 85.00% <85.00%> (ø) |
|
RabbitMQ.Stream.Client/Subscribe.cs | 78.94% <85.71%> (+13.72%) |
:arrow_up: |
RabbitMQ.Stream.Client/Consumer.cs | 90.47% <88.37%> (-1.42%) |
:arrow_down: |
RabbitMQ.Stream.Client/Client.cs | 90.84% <90.00%> (-1.03%) |
:arrow_down: |
RabbitMQ.Stream.Client/PartitionsQueryResponse.cs | 91.30% <91.30%> (ø) |
|
... and 18 more |
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.
:umbrella: View full report at Codecov.
:loudspeaker: Do you have feedback about the report comment? Let us know in this issue.
We need to add tests for ReliableConsumer
and some Example
How to test the standard consumers:
public static async Task Start()
{
var config = new StreamSystemConfig();
var system = await StreamSystem.Create(config);
var streamName = Guid.NewGuid().ToString();
await system.CreateStream(new StreamSpec(streamName));
// The application name must be the same for all consumers
const string applicationName = "myApplication";
var producer = await system.CreateProducer(new ProducerConfig() {Stream = streamName});
for (var i = 0; i < 100; i++)
{
var message = new Message(Encoding.UTF8.GetBytes($"hello {i}"));
await producer.Send(Convert.ToUInt64(i), message);
}
var c1 = await system.CreateConsumer(new ConsumerConfig()
{
Stream = streamName,
Reference = applicationName,
// First consumer, it will use OffsetTypeFirst
OffsetSpec = new OffsetTypeFirst(),
// Single Active Consumer Active is mandatory
IsSingleActiveConsumer = true,
MessageHandler = async (c, context, arg3) =>
{
await c.StoreOffset(context.Offset);
Console.WriteLine(
$" first received message: {Encoding.Default.GetString(arg3.Data.Contents)} offset {context.Offset}");
}
});
// ONLY the first consumer c1 is active and will receive messages
var c2 = await system.CreateConsumer(new ConsumerConfig()
{
Stream = streamName,
Reference = applicationName,
IsSingleActiveConsumer = true,
// ConsumerUpdateListener is implemented
// so the client will use ConsumerUpdateListener to start teh offset
ConsumerUpdateListener = async (reference, stream, isActive) =>
{
Console.WriteLine($"{reference} is {(isActive ? "active" : "inactive")}");
var o = await system.QueryOffset(applicationName, stream);
return new OffsetTypeOffset(o);
},
MessageHandler = async (c, context, arg3) =>
{
await c.StoreOffset(context.Offset);
Console.WriteLine(
$" second received message: {Encoding.Default.GetString(arg3.Data.Contents)} offset {context.Offset}");
}
});
// When the first consumer is closed, the second consumer will become active
// in this case the second consumer will start from the last offset: stem.QueryOffset(applicationName, stream)
Console.WriteLine("Press any key to close the consumer 1");
Console.ReadKey();
await c1.Close();
// c2 is now active
Thread.Sleep(3_000);
await system.DeleteStream(streamName);
}
RSingleActiveConsumer:
public static async Task Start()
{
var config = new StreamSystemConfig();
var system = await StreamSystem.Create(config);
var streamName = Guid.NewGuid().ToString();
await system.CreateStream(new StreamSpec(streamName));
const string applicationName = "myApplication";
var producer = await system.CreateProducer(new ProducerConfig() {Stream = streamName});
for (var i = 0; i < 100; i++)
{
var message = new Message(Encoding.UTF8.GetBytes($"hello {i}"));
await producer.Send(Convert.ToUInt64(i), message);
}
var c1 = await ReliableConsumer.CreateReliableConsumer(new ReliableConsumerConfig()
{
StreamSystem = system,
Stream = streamName,
Reference = applicationName,
ClientProvidedName = "first",
OffsetSpec = new OffsetTypeFirst(),
// IsSingleActiveConsumer = true,
MessageHandler = async (c, context, arg3) =>
{
Thread.Sleep(1);
await c.StoreOffset(context.Offset);
Console.WriteLine(
$"first Consumer received message: {Encoding.Default.GetString(arg3.Data.Contents)} offset {context.Offset}");
}
});
var c2 = await ReliableConsumer.CreateReliableConsumer(new ReliableConsumerConfig()
{
StreamSystem = system,
Stream = streamName,
Reference = applicationName,
ClientProvidedName = "second",
IsSingleActiveConsumer = true,
OffsetSpec = new OffsetTypeFirst(),
ConsumerUpdateListener = async (reference, stream, isActive) =>
{
Console.WriteLine($"{reference} is {(isActive ? "active" : "inactive")}");
var o = await system.QueryOffset(applicationName, streamName);
return new OffsetTypeOffset(o);
},
MessageHandler = async (c, context, arg3) =>
{
await c.StoreOffset(context.Offset);
Console.WriteLine($" second received message: {Encoding.Default.GetString(arg3.Data.Contents)} offset {context.Offset}");
}
});
try
{
Console.ReadKey();
await c1.Close();
Thread.Sleep(2000);
await c2.Close();
Console.WriteLine("closing");
Thread.Sleep(10);
await c2.Close();
await system.DeleteStream(streamName);
}
catch (Exception e)
{
Console.WriteLine(e);
}
}