stan.net icon indicating copy to clipboard operation
stan.net copied to clipboard

make AsyncSubscriber with await.

Open alexpantyukhin opened this issue 5 years ago • 4 comments

I reviewed the Subscribe method usages: Subscribe(string subject, StanSubscriptionOptions options, EventHandler<StanMsgHandlerArgs> handler);

I think we can have some kind more improvements. Please see the following code:

var cf = new StanConnectionFactory();
using (var c = cf.CreateConnection("test-cluster", "subscriber", opt))
{
    var asyncSubscriber = c.AsyncSubscriber(request.Topic)
    ConsumedResult<byte[]> consumedResult = null;
    do
    {
            var consumedResult = await asyncSubscriber.Consume();
            // handle consumed result
    }
    while (consumedResult.Message != null);
    asyncSubscriber.Unsubscribe();
}

alexpantyukhin avatar Jul 26 '19 11:07 alexpantyukhin

I do like that pattern. Would you want to make a contribution?

ColinSullivan1 avatar Aug 15 '19 15:08 ColinSullivan1

If I will have time soon I would like to do it. But also I don't mind if it will be implemented by someone else :)

alexpantyukhin avatar Aug 20 '19 13:08 alexpantyukhin

C#8 now provides Asynchronous Streams which may be an even more natural fit for this, though you'd only be able to expose it to .Net Standard 2.1 & .Net Core 3 targets.

RichiCoder1 avatar Sep 25 '19 03:09 RichiCoder1

Here is another approach using channels.

var channel = Channel.CreateUnbounded<StanMsg>();
using var connection = new StanConnectionFactory().CreateConnection("test-cluster", "test-client");
using var subscription = connection.Subscribe("topic", (sender, args) =>
{            
    while (!channel.Writer.TryWrite(args.Message));
});

await foreach (var msg in channel.Reader.ReadAllAsync())
{
    // do work
}

https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/

johnsusi avatar Jan 15 '20 17:01 johnsusi