stan.net
stan.net copied to clipboard
make AsyncSubscriber with await.
I reviewed the Subscribe
method usages: Subscribe(string subject, StanSubscriptionOptions options, EventHandler<StanMsgHandlerArgs> handler);
I think we can have some kind more improvements. Please see the following code:
var cf = new StanConnectionFactory();
using (var c = cf.CreateConnection("test-cluster", "subscriber", opt))
{
var asyncSubscriber = c.AsyncSubscriber(request.Topic)
ConsumedResult<byte[]> consumedResult = null;
do
{
var consumedResult = await asyncSubscriber.Consume();
// handle consumed result
}
while (consumedResult.Message != null);
asyncSubscriber.Unsubscribe();
}
I do like that pattern. Would you want to make a contribution?
If I will have time soon I would like to do it. But also I don't mind if it will be implemented by someone else :)
C#8 now provides Asynchronous Streams
which may be an even more natural fit for this, though you'd only be able to expose it to .Net Standard 2.1 & .Net Core 3 targets.
Here is another approach using channels.
var channel = Channel.CreateUnbounded<StanMsg>();
using var connection = new StanConnectionFactory().CreateConnection("test-cluster", "test-client");
using var subscription = connection.Subscribe("topic", (sender, args) =>
{
while (!channel.Writer.TryWrite(args.Message));
});
await foreach (var msg in channel.Reader.ReadAllAsync())
{
// do work
}
https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/