rabbitmq-stream-dotnet-client icon indicating copy to clipboard operation
rabbitmq-stream-dotnet-client copied to clipboard

Work in progress Single Active Consumer

Open Gsantomaggio opened this issue 2 years ago • 2 comments

Work on this feature with @Zerpet

Signed-off-by: Gabriele Santomaggio [email protected]

Closes https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/issues/146

Note: SacTests will fail until the 3.11 is released

Gsantomaggio avatar Jul 19 '22 13:07 Gsantomaggio

Codecov Report

Base: 92.45% // Head: 92.42% // Decreases project coverage by -0.03% :warning:

Coverage data is based on head (7a1b408) compared to base (dae7e12). Patch coverage: 92.61% of modified lines in pull request are covered.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #152      +/-   ##
==========================================
- Coverage   92.45%   92.42%   -0.04%     
==========================================
  Files          78       89      +11     
  Lines        6220     7138     +918     
  Branches      385      445      +60     
==========================================
+ Hits         5751     6597     +846     
- Misses        380      439      +59     
- Partials       89      102      +13     
Impacted Files Coverage Δ
RabbitMQ.Stream.Client/Reliable/ReliableBase.cs 100.00% <ø> (ø)
RabbitMQ.Stream.Client/Hash/Extensions.cs 31.25% <31.25%> (ø)
Tests/Utils.cs 72.82% <58.33%> (-4.54%) :arrow_down:
RabbitMQ.Stream.Client/Producer.cs 79.31% <75.00%> (+1.36%) :arrow_up:
RabbitMQ.Stream.Client/StreamSystem.cs 89.37% <75.86%> (-2.39%) :arrow_down:
...bitMQ.Stream.Client/ConsumerUpdateQueryResponse.cs 85.00% <85.00%> (ø)
RabbitMQ.Stream.Client/Subscribe.cs 78.94% <85.71%> (+13.72%) :arrow_up:
RabbitMQ.Stream.Client/Consumer.cs 90.47% <88.37%> (-1.42%) :arrow_down:
RabbitMQ.Stream.Client/Client.cs 90.84% <90.00%> (-1.03%) :arrow_down:
RabbitMQ.Stream.Client/PartitionsQueryResponse.cs 91.30% <91.30%> (ø)
... and 18 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

:umbrella: View full report at Codecov.
:loudspeaker: Do you have feedback about the report comment? Let us know in this issue.

codecov[bot] avatar Jul 19 '22 14:07 codecov[bot]

We need to add tests for ReliableConsumer and some Example

Gsantomaggio avatar Aug 25 '22 12:08 Gsantomaggio

How to test the standard consumers:

public static async Task Start()
    {
        var config = new StreamSystemConfig();
        var system = await StreamSystem.Create(config);
        var streamName = Guid.NewGuid().ToString();
        await system.CreateStream(new StreamSpec(streamName));
        
        // The application name must be the same for all consumers
        const string applicationName = "myApplication";
        
        
        var producer = await system.CreateProducer(new ProducerConfig() {Stream = streamName});
        for (var i = 0; i < 100; i++)
        {
            var message = new Message(Encoding.UTF8.GetBytes($"hello {i}"));
            await producer.Send(Convert.ToUInt64(i), message);
        }


        var c1 = await system.CreateConsumer(new ConsumerConfig()
        {
            Stream = streamName,
            Reference = applicationName,
            // First consumer, it will use OffsetTypeFirst
            OffsetSpec = new OffsetTypeFirst(),
            // Single Active Consumer Active is mandatory
            IsSingleActiveConsumer = true,
            MessageHandler = async (c, context, arg3) =>
            {
                await c.StoreOffset(context.Offset);
                Console.WriteLine(
                    $" first received message: {Encoding.Default.GetString(arg3.Data.Contents)} offset {context.Offset}");
            }
        });
        // ONLY the first consumer c1 is active and will receive messages

        var c2 = await system.CreateConsumer(new ConsumerConfig()
        {
            Stream = streamName,
            Reference = applicationName,
            IsSingleActiveConsumer = true,
            // ConsumerUpdateListener is implemented
            // so the client will use ConsumerUpdateListener to start teh offset

            ConsumerUpdateListener = async (reference, stream, isActive) =>
            {
                Console.WriteLine($"{reference} is {(isActive ? "active" : "inactive")}");
                var o = await system.QueryOffset(applicationName, stream);
                return new OffsetTypeOffset(o);
            },
            MessageHandler = async (c, context, arg3) =>
            {
                await c.StoreOffset(context.Offset);
                Console.WriteLine(
                    $" second received message: {Encoding.Default.GetString(arg3.Data.Contents)} offset {context.Offset}");
            }
        });

        // When the first consumer is closed, the second consumer will become active
        // in this case the second consumer will start from the last offset: stem.QueryOffset(applicationName, stream)
        
        Console.WriteLine("Press any key to close the consumer 1");
        Console.ReadKey();
        await c1.Close();
        // c2 is now active
        Thread.Sleep(3_000);
        await system.DeleteStream(streamName);
    }

RSingleActiveConsumer:

  public static async Task Start()
    {
        var config = new StreamSystemConfig();
        var system = await StreamSystem.Create(config);
        var streamName = Guid.NewGuid().ToString();
        await system.CreateStream(new StreamSpec(streamName));
        const string applicationName = "myApplication";
        var producer = await system.CreateProducer(new ProducerConfig() {Stream = streamName});
        for (var i = 0; i < 100; i++)
        {
            var message = new Message(Encoding.UTF8.GetBytes($"hello {i}"));
            await producer.Send(Convert.ToUInt64(i), message);
        }

        var c1 = await ReliableConsumer.CreateReliableConsumer(new ReliableConsumerConfig()
        {
            StreamSystem = system,
            Stream = streamName,
            Reference = applicationName,
            ClientProvidedName = "first",
            OffsetSpec = new OffsetTypeFirst(),
            // IsSingleActiveConsumer = true,
            MessageHandler = async (c, context, arg3) =>
            {
                Thread.Sleep(1);
                await c.StoreOffset(context.Offset);
                Console.WriteLine(
                    $"first Consumer received message: {Encoding.Default.GetString(arg3.Data.Contents)} offset {context.Offset}");
            }
        });

        var c2 = await ReliableConsumer.CreateReliableConsumer(new ReliableConsumerConfig()
        {
            StreamSystem = system,
            Stream = streamName,
            Reference = applicationName,
            ClientProvidedName = "second",
            IsSingleActiveConsumer = true,
            OffsetSpec = new OffsetTypeFirst(),
            ConsumerUpdateListener = async (reference, stream, isActive) =>
            {
                Console.WriteLine($"{reference} is {(isActive ? "active" : "inactive")}");
                var o = await system.QueryOffset(applicationName, streamName);
                return new OffsetTypeOffset(o);
            },
            MessageHandler = async (c, context, arg3) =>
            {

                await c.StoreOffset(context.Offset);
                Console.WriteLine($" second received message: {Encoding.Default.GetString(arg3.Data.Contents)} offset {context.Offset}");
            }
        });

        try
        {
            Console.ReadKey();
            await c1.Close();

            Thread.Sleep(2000);

            await c2.Close();
            Console.WriteLine("closing");
            Thread.Sleep(10);

            await c2.Close();
            await system.DeleteStream(streamName);
        }
        catch (Exception e)
        {
            Console.WriteLine(e);
        }
    }

Gsantomaggio avatar Oct 03 '22 07:10 Gsantomaggio