Solace client does not have async support
Currently SolaceSystems.Solclient.Messaging does not support async api for C# . e.g.
- when publishing
ReturnCode returnCode = session.Send(message); - when creating session
context.CreateSession(sessionProps, null, null)
Is there any plan to provide async API ?
Hi @mostofakamal ,
You are correct, there is no async support in the api. However, to prevent the API from blocking in the operations you mentioned you can set the following SessionProperties flags to false
ConnectBlocking
SendBlocking
⚠️ There are mistakes in this implementation. Leaving it here as a reference, but do not use without reading on first...
Hello @mostofakamal,
A workaround to enable async / await capabilities would be to leverage extension methods. For example:
public static class SolaceExtensions
{
public static Task<ReturnCode> ConnectAsync(this ISession session)
{
return Task.Run(() =>
{
var tsc = new TaskCompletionSource<ReturnCode>();
try
{
var result = session.Connect();
tsc.SetResult(result);
}
catch (Exception ex)
{
tsc.SetException(ex);
}
return tsc.Task;
});
}
public static Task<ReturnCode> SubscribeAsync(this ISession session, ISubscription subscription)
{
return Task.Run(() =>
{
var tsc = new TaskCompletionSource<ReturnCode>();
try
{
var result = session.Subscribe(subscription, true);
tsc.SetResult(result);
}
catch (Exception ex)
{
tsc.SetException(ex);
}
return tsc.Task;
});
}
public static Task<ReturnCode> SendAsync(this ISession session, IMessage message)
{
return Task.Run(() =>
{
var tsc = new TaskCompletionSource<ReturnCode>();
try
{
var result = session.Send(message); ;
tsc.SetResult(result);
}
catch (Exception ex)
{
tsc.SetException(ex);
}
return tsc.Task;
});
}
}
And then in your code, you can simply:
// Keep ConnectBlocking and SendBlocking to true
using (var context = ContextFactory.Instance.CreateContext(contextProperties, null))
using (var session = context.CreateSession(sessionProperties, HandleMessage, null))
{
// Connect to the Solace messaging router
Console.WriteLine($"Connecting as {username}@{vpnname} on {host}...");
var connectResult = await session.ConnectAsync(); // <-------- awaitable Connect
if (connectResult == ReturnCode.SOLCLIENT_OK)
{
Console.WriteLine("Session successfully connected.");
// Create a topic and subscribe to it
using (var topic = ContextFactory.Instance.CreateTopic("eliding/>"))
{
var subscribeResult = await session.SubscribeAsync(topic); // <-------- awaitable Subscribe
if(subscribeResult == ReturnCode.SOLCLIENT_OK)
{
Console.WriteLine("Waiting for a message to be published...");
// ... more goes here
}
}
}
// the rest of your code
// and sending messages:
// Create a topic and subscribe to it
using (var message = ContextFactory.Instance.CreateMessage())
using (var topic = ContextFactory.Instance.CreateTopic($"eliding/{appId}/sub/verb/obj/{key}"))
{
message.Destination = topic;
message.BinaryAttachment = Encoding.UTF8.GetBytes($"{key}.{count}");
message.DeliveryMode = MessageDeliveryMode.Direct;
message.ElidingEligible = true;
// Publish the message to the topic on the Solace messaging router
var sendResult = await session.SendAsync(message); // <-------- awaitable Send
if (sendResult == ReturnCode.SOLCLIENT_OK)
{
Console.WriteLine("Done.");
}
// the rest of your code
@nicholasdgoodman Thanks for the workaround, This is basically a way of calling a synchronous method from an asynchronous context . However it will still block the main request thread when you await that. Is there any event that is available that i can use to make it really asynchronous like below:
public Task<T> ConnectAsync(ISession session)
{
TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
// will get raised, when it is connected
session.Connected += (args) =>
{
// this will notify the caller
// of the SomeApiWrapper that
// the task just completed
tcs.SetResult(args);
}
// start the connect
obj.Connect();
return tcs.Task;
}
Are you sure it will block the main request thread? By your wording, I'm guessing this is ASP.NET Core request handler (or perhaps something like an Event Hub callback) and not a desktop UI application, correct?
I would have thought that by dispatching the blocking calls onto a thread pool thread (via Task.Run(() => ...)) we would have been in the clear.
Nevertheless, there is an event (actually a whole bunch of events) you can subscribe to on the session, just not in the most intuitive way.
There is a single handler for all session events at session creation time:
using (var session = context.CreateSession(sessionProperties, HandleMessage, HandleSessionEvent))
{
//...
}
which looks something like:
static void HandleSessionEvent(object source, SessionEventArgs args)
{
switch (args.Event)
{
case SessionEvent.UpNotice:
// The session is established
break;
case SessionEvent.ConnectFailedError:
// The session failed to connect
break;
}
}
And therein lies a bit of a problem, which is how do you scope the factory method argument with an extension method? For that I do not have a great answer other than to suggest creating a custom class that implements and extends ISession and managing it that way.
Later today or this weekend I will attempt to write up such a sample, but will also experiment with these existing extension method suggestions and verify their blocking / non-blocking behaviors.
Apologies for the spam of comments, but I just recalled that if this code is being invoked from an ASP.NET Core controller, then the Task.Run(() => {}) calls are superfluous, because all controller callbacks are already invoked on a thread pool thread.
That being said, making a blocking call on it will not necessarily affect other requests initially, until you encounter thread pool starvation; so I will see if there is a good succinct way of making this API "truly async" over the next few days.
In General blocking calls are always a problem. According to : https://learn.microsoft.com/en-us/aspnet/core/performance/performance-best-practices?view=aspnetcore-6.0#avoid-blocking-calls we should not be doing Task.Run for ASPNET Core as you mentioned it would created unnecessary thread pool scheduling . Since this is an IO operation ,would really appreciate true async support for this :-) . Thanks for your fast response :-)
So although it gets a little cumbersome, by writing your own version of a Solace session interface, you can do:
public interface ISessionEx : IDisposable
{
Task<ReturnCode> ConnectAsync();
event EventHandler<MessageEventArgs> MessageReceived;
}
public static class SolaceExtensions
{
public static ISessionEx CreateSessionEx(this IContext context, SessionProperties properties)
{
return new SessionEx(context, properties);
}
private class SessionEx : ISessionEx
{
ISession session;
TaskCompletionSource<ReturnCode> connectTaskCompletion;
public SessionEx(IContext context, SessionProperties properties)
{
properties.BlockWhileConnecting = false; // May want to make a copy instead...
properties.SubscribeBlocking = false;
this.session = context.CreateSession(properties, MessageEventHandler, SessionEventHandler);
}
public event EventHandler<MessageEventArgs> MessageReceived;
public Task<ReturnCode> ConnectAsync()
{
try
{
this.connectTaskCompletion = new TaskCompletionSource<ReturnCode>();
this.session.Connect();
}
catch (Exception ex)
{
this.connectTaskCompletion.SetException(ex);
}
return this.connectTaskCompletion.Task;
}
private void SessionEventHandler(object sender, SessionEventArgs e)
{
switch (e.Event)
{
case SessionEvent.UpNotice:
this.connectTaskCompletion?.SetResult(ReturnCode.SOLCLIENT_OK);
break;
case SessionEvent.ConnectFailedError:
this.connectTaskCompletion?.SetResult(ReturnCode.SOLCLIENT_FAIL);
break;
}
}
private void MessageEventHandler(object sender, MessageEventArgs e)
{
this.MessageReceived?.Invoke(this, e);
}
public void Dispose()
{
session?.Dispose();
}
}
}
Note the code above is only a "simple illustration" and lacks some important considerations, for example ConnectAsync is not re-entrant and should only be called once; also, depending on the usage, you may want ISessionEx to extend ISession in which case you need to implement the base interface through the session field.
Also, not knowing the exact use case for ASP.NET Core web service calling Solace, another alternative would be to keep using blocking Connect and Subscribe calls - but do so during application startup instead of during request handling.
For example:
services.AddSingleton<ISession>(sp =>
{
// Solace configuration and setup logic...
session.Connect(); // Blocking call happens only once
// subscriptions can block too...
return session;
});
And a given controller would be given a connected session:
class SomeController : Controller
{
ISession solaceSession;
public SomeController(ISession solaceSession)
{
this.solaceSession = solaceSession;
}
}
We have some internal discussions how and when to best extend the existing SDK with out-of-the-box Async capabilities; but so far there are no concrete plans to add it just yet.
nicholasdgoodman But still the message send will be synchronous . We could create session during start up in a blocking way but it is not the case for Send. So for an event driven system producing a lot of messages this would be a problem
@mostofakamal Fair observation. I have taken the examples described in this discussion and placed them on my fork of the repo: https://github.com/nicholasdgoodman/solace-samples-dotnet/tree/f68b9c54bcea5305e641cff85b8d267f7b48eaf9
If you examine the file SolaceExtensions.cs you can see an example of performing Send asynchronously in the same manner we did with Connect in the earlier example.
Closing as stale