solace-samples-dotnet icon indicating copy to clipboard operation
solace-samples-dotnet copied to clipboard

Solace client does not have async support

Open mostofakamal opened this issue 3 years ago • 9 comments

Currently SolaceSystems.Solclient.Messaging does not support async api for C# . e.g.

  • when publishing ReturnCode returnCode = session.Send(message);
  • when creating session context.CreateSession(sessionProps, null, null)

Is there any plan to provide async API ?

mostofakamal avatar Sep 21 '22 12:09 mostofakamal

Hi @mostofakamal ,

You are correct, there is no async support in the api. However, to prevent the API from blocking in the operations you mentioned you can set the following SessionProperties flags to false

ConnectBlocking
SendBlocking

TamimiGitHub avatar Sep 22 '22 16:09 TamimiGitHub

⚠️ There are mistakes in this implementation. Leaving it here as a reference, but do not use without reading on first...

Hello @mostofakamal,

A workaround to enable async / await capabilities would be to leverage extension methods. For example:

public static class SolaceExtensions
{
    public static Task<ReturnCode> ConnectAsync(this ISession session)
    {
        return Task.Run(() =>
        {
            var tsc = new TaskCompletionSource<ReturnCode>();
            try
            {
                var result = session.Connect();
                tsc.SetResult(result);
            }
            catch (Exception ex)
            {
                tsc.SetException(ex);
            }
            return tsc.Task;
        });
    }

    public static Task<ReturnCode> SubscribeAsync(this ISession session, ISubscription subscription)
    {
        return Task.Run(() =>
        {
            var tsc = new TaskCompletionSource<ReturnCode>();

            try
            {
                var result = session.Subscribe(subscription, true);
                tsc.SetResult(result);
            }
            catch (Exception ex)
            {
                tsc.SetException(ex);
            }
            return tsc.Task;
        });
    }

    public static Task<ReturnCode> SendAsync(this ISession session, IMessage message)
    {
        return Task.Run(() =>
        {
            var tsc = new TaskCompletionSource<ReturnCode>();
            try
            {
                var result = session.Send(message); ;
                tsc.SetResult(result);
            }
            catch (Exception ex)
            {
                tsc.SetException(ex);
            }
            return tsc.Task;
        });
    }
}

And then in your code, you can simply:

// Keep ConnectBlocking and SendBlocking to true

using (var context = ContextFactory.Instance.CreateContext(contextProperties, null))
using (var session = context.CreateSession(sessionProperties, HandleMessage, null))
{
    // Connect to the Solace messaging router
    Console.WriteLine($"Connecting as {username}@{vpnname} on {host}...");

    var connectResult = await session.ConnectAsync();              // <-------- awaitable Connect

    if (connectResult == ReturnCode.SOLCLIENT_OK)
    {
        Console.WriteLine("Session successfully connected.");

        // Create a topic and subscribe to it
        using (var topic = ContextFactory.Instance.CreateTopic("eliding/>"))
        {
            var subscribeResult = await session.SubscribeAsync(topic);              // <-------- awaitable Subscribe
            
            if(subscribeResult == ReturnCode.SOLCLIENT_OK)
           {
              Console.WriteLine("Waiting for a message to be published...");
              // ... more goes here
           }
        }
    }

// the rest of your code
// and sending messages:

// Create a topic and subscribe to it
using (var message = ContextFactory.Instance.CreateMessage())
using (var topic = ContextFactory.Instance.CreateTopic($"eliding/{appId}/sub/verb/obj/{key}"))
{
    message.Destination = topic;
    message.BinaryAttachment = Encoding.UTF8.GetBytes($"{key}.{count}");
    message.DeliveryMode = MessageDeliveryMode.Direct;
    message.ElidingEligible = true;

    // Publish the message to the topic on the Solace messaging router

    var sendResult = await session.SendAsync(message);              // <-------- awaitable Send
    if (sendResult == ReturnCode.SOLCLIENT_OK)
    {
        Console.WriteLine("Done.");
    }

// the rest of your code

nicholasdgoodman avatar Sep 22 '22 17:09 nicholasdgoodman

@nicholasdgoodman Thanks for the workaround, This is basically a way of calling a synchronous method from an asynchronous context . However it will still block the main request thread when you await that. Is there any event that is available that i can use to make it really asynchronous like below:

public Task<T> ConnectAsync(ISession session)
{
    TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); 

    // will get raised, when it is connected
    session.Connected += (args) => 
    {
        // this will notify the caller 
        // of the SomeApiWrapper that 
        // the task just completed
        tcs.SetResult(args);
    }

    // start the connect 
    obj.Connect();

    return tcs.Task;
}

mostofakamal avatar Sep 23 '22 14:09 mostofakamal

Are you sure it will block the main request thread? By your wording, I'm guessing this is ASP.NET Core request handler (or perhaps something like an Event Hub callback) and not a desktop UI application, correct?

I would have thought that by dispatching the blocking calls onto a thread pool thread (via Task.Run(() => ...)) we would have been in the clear.

Nevertheless, there is an event (actually a whole bunch of events) you can subscribe to on the session, just not in the most intuitive way.

There is a single handler for all session events at session creation time:

using (var session = context.CreateSession(sessionProperties, HandleMessage, HandleSessionEvent))
{
  //...
}

which looks something like:

static void HandleSessionEvent(object source, SessionEventArgs args)
{
    switch (args.Event)
    {
        case SessionEvent.UpNotice:
            // The session is established
            break;
        case SessionEvent.ConnectFailedError:
            // The session failed to connect
            break;
    }
}

And therein lies a bit of a problem, which is how do you scope the factory method argument with an extension method? For that I do not have a great answer other than to suggest creating a custom class that implements and extends ISession and managing it that way.

Later today or this weekend I will attempt to write up such a sample, but will also experiment with these existing extension method suggestions and verify their blocking / non-blocking behaviors.

nicholasdgoodman avatar Sep 23 '22 16:09 nicholasdgoodman

Apologies for the spam of comments, but I just recalled that if this code is being invoked from an ASP.NET Core controller, then the Task.Run(() => {}) calls are superfluous, because all controller callbacks are already invoked on a thread pool thread.

That being said, making a blocking call on it will not necessarily affect other requests initially, until you encounter thread pool starvation; so I will see if there is a good succinct way of making this API "truly async" over the next few days.

nicholasdgoodman avatar Sep 23 '22 17:09 nicholasdgoodman

In General blocking calls are always a problem. According to : https://learn.microsoft.com/en-us/aspnet/core/performance/performance-best-practices?view=aspnetcore-6.0#avoid-blocking-calls we should not be doing Task.Run for ASPNET Core as you mentioned it would created unnecessary thread pool scheduling . Since this is an IO operation ,would really appreciate true async support for this :-) . Thanks for your fast response :-)

mostofakamal avatar Sep 26 '22 13:09 mostofakamal

So although it gets a little cumbersome, by writing your own version of a Solace session interface, you can do:

    public interface ISessionEx : IDisposable
    {
        Task<ReturnCode> ConnectAsync();
        event EventHandler<MessageEventArgs> MessageReceived;
    }

    public static class SolaceExtensions
    {
        public static ISessionEx CreateSessionEx(this IContext context, SessionProperties properties)
        {
            return new SessionEx(context, properties);
        }

        private class SessionEx : ISessionEx
        {
            ISession session;
            TaskCompletionSource<ReturnCode> connectTaskCompletion;

            public SessionEx(IContext context, SessionProperties properties)
            {
                properties.BlockWhileConnecting = false; // May want to make a copy instead...
                properties.SubscribeBlocking = false;
                this.session = context.CreateSession(properties, MessageEventHandler, SessionEventHandler);   
            }

            public event EventHandler<MessageEventArgs> MessageReceived;

            public Task<ReturnCode> ConnectAsync()
            {
                try
                {
                    this.connectTaskCompletion = new TaskCompletionSource<ReturnCode>();
                    this.session.Connect();
                }
                catch (Exception ex)
                {
                    this.connectTaskCompletion.SetException(ex);
                }

                return this.connectTaskCompletion.Task;
            }


            private void SessionEventHandler(object sender, SessionEventArgs e)
            {
                switch (e.Event)
                {
                    case SessionEvent.UpNotice:
                        this.connectTaskCompletion?.SetResult(ReturnCode.SOLCLIENT_OK);
                        break;
                    case SessionEvent.ConnectFailedError:
                        this.connectTaskCompletion?.SetResult(ReturnCode.SOLCLIENT_FAIL);
                        break;
                }
            }

            private void MessageEventHandler(object sender, MessageEventArgs e)
            {
                this.MessageReceived?.Invoke(this, e);
            }

            public void Dispose()
            {
                session?.Dispose();
            }
        }
    }

Note the code above is only a "simple illustration" and lacks some important considerations, for example ConnectAsync is not re-entrant and should only be called once; also, depending on the usage, you may want ISessionEx to extend ISession in which case you need to implement the base interface through the session field.

Also, not knowing the exact use case for ASP.NET Core web service calling Solace, another alternative would be to keep using blocking Connect and Subscribe calls - but do so during application startup instead of during request handling.

For example:

services.AddSingleton<ISession>(sp =>
{
  // Solace configuration and setup logic...
  
  session.Connect();    // Blocking call happens only once

  // subscriptions can block too...
  return session;
});

And a given controller would be given a connected session:

class SomeController : Controller
{
  ISession solaceSession;
  public SomeController(ISession solaceSession)
  {
    this.solaceSession = solaceSession;
  }
}

We have some internal discussions how and when to best extend the existing SDK with out-of-the-box Async capabilities; but so far there are no concrete plans to add it just yet.

nicholasdgoodman avatar Sep 26 '22 18:09 nicholasdgoodman

nicholasdgoodman But still the message send will be synchronous . We could create session during start up in a blocking way but it is not the case for Send. So for an event driven system producing a lot of messages this would be a problem

mostofakamal avatar Sep 30 '22 15:09 mostofakamal

@mostofakamal Fair observation. I have taken the examples described in this discussion and placed them on my fork of the repo: https://github.com/nicholasdgoodman/solace-samples-dotnet/tree/f68b9c54bcea5305e641cff85b8d267f7b48eaf9

If you examine the file SolaceExtensions.cs you can see an example of performing Send asynchronously in the same manner we did with Connect in the earlier example.

nicholasdgoodman avatar Oct 03 '22 22:10 nicholasdgoodman

Closing as stale

Mrc0113 avatar Nov 15 '23 20:11 Mrc0113