ably-dotnet
ably-dotnet copied to clipboard
Async Subscribe handler
Pretty much exactly what was raised in #94, but that one seems to have been closed without any resolution to one part of the issue:
Currently there is no Func<Message, Task> handler support in IRealtimeChannel.Subscribe/Unsubscribe.
I'm fine with there not being any async API for (Un)Subscribe if doing "async over sync" is OK (read below), but I wonder if it's simply an oversight as there is one for IEventEmitter.On/Off.
I'd like to fetch some data from my backend based on the message coming in. In my particular case the message will basically either tell me the latest state of some backend model I'm "watching", or it will tell me "too much changed, you're better off to just fetch again". The fetching of course does some I/O, thus it's async.
Is an alternative solution to just do GetAwaiter().GetResult() in the Subscribe handler to make it synchronous? I'm not a friend of "async over sync" (or the reverse), but I'm pretty sure no user of Ably's .NET API ever needs a Subscribe handler to synchronize back onto the context Ably is calling their handler from, so we should be fine with Solution A in all cases.
I'm primarily worried that I might accidentally block something on Ably's side here as long as my async I/O runs in the handler. Additionally I'd like to avoid having to manage my own Task queue for these Subscribe handler tasks - I'd have to link them to each Subscribe subscription to prevent Unsubscribe not cancelling the (potentially still queued or already running) tasks.
Internal conv: Internal https://ably-real-time.slack.com/archives/C030C5YLY/p1705494514068369
That's a good point. I haven't thought about adding an async handler support for subscribing to Presence and Channel updates even thought there is one in the event emitter.
Currently if you block inside the subscribe function it will be Ok as the websocket is an a separate thread and the updates will be processed but will wait to call the Subscribe handlers.
If you don't care about order (which I think you do) you can just fire and forget the task like this
_ = LongRunningAsyncJob() inside the sync handler.
If you care about the order it is ok to use Solution A from Stephen Cleary's answer and block the execution to process what you need to do.
Just to be on the safe side I added the following test to make sure messages are processed correctly with a long blocking handler.
public async Task WhenSubscribeHandlerBlocks_ShouldBehaveOk(Protocol protocol)
{
var client = await GetRealtimeClient(protocol);
var channel = client.Channels.Get("test".AddRandomSuffix());
int processedMessaged = 0;
List<int> order = new List<int>();
var taskAwaiter = new TaskCompletionAwaiter(20000);
channel.Subscribe(m =>
{
if (processedMessaged <= 0)
{
Task.Delay(15000).WaitAndUnwrapException();
}
order.Add(int.Parse(m.Data.ToString()));
processedMessaged++;
if (processedMessaged == 3)
{
taskAwaiter.SetCompleted();
}
});
await channel.AttachAsync();
channel.Publish("test", "1");
channel.Publish("test", "2");
channel.Publish("test", "3");
await taskAwaiter.Task;
order.Should().HaveCount(3);
order.Should().BeInAscendingOrder();
}
I'm going to add it to the TODO list for the library.
Thanks for the fast response, and an even bigger thanks for that test as the confirmation! I'll continue with the blocking calls as I do indeed care about the order.
In case you are not using issues here on GitHub for the TODOs feel free to close this issue.
Thanks @bddckr. I'll just leave this issue open until we get it resolved :)