websocket-client icon indicating copy to clipboard operation
websocket-client copied to clipboard

Messages received before Start finished

Open LadislavBohm opened this issue 4 years ago • 3 comments

Hi, I think this is not intended behavior. When you create a client, subscribe to a message observable and then await Start (or StartOrFail) method, you might receive one or more messages before Start has finished. This is especially annoying since protocols like Socket.IO send handshake immediately after connection and you cannot respond to it, because the socket is not ready yet.

Here is an example code on how to simulate this behavior:

using (IWebsocketClient client = new WebsocketClient(new Uri("wss://streamer.cryptocompare.com/socket.io/?EIO=3&transport=websocket")))
{
    Stopwatch sw = Stopwatch.StartNew();
    string received = null;
    var receivedEvent = new ManualResetEvent(false);

    client.MessageReceived.Subscribe(msg =>
    {
        sw.Stop();
        receivedEvent.Set();
    });

    await client.StartOrFail();
    Assert.True(sw.IsRunning);
    sw.Stop();

    receivedEvent.WaitOne(TimeSpan.FromSeconds(30));

    Assert.NotNull(received);
}

LadislavBohm avatar Mar 13 '20 06:03 LadislavBohm

Hello @LadislavBohm ,

thanks for the issue. I wasn't able to reproduce it: image

But anyway, I see your point. In my opinion, this WebsocketClient should be lightweight and straightforward, I'm afraid that any extra hidden features would kill usability for different use-cases.

For example, this situation is easily solvable on the client-side via ManualResetEvent, lock, async-lock or more cleaner by Reactive extensions.

using (IWebsocketClient client = new WebsocketClient(new Uri("wss://streamer.cryptocompare.com/socket.io/?EIO=3&transport=websocket")))
            {
                var startEvent = new ManualResetEvent(false);

                client.MessageReceived.Subscribe(msg =>
                {
                    // wait to start/connect
                    startEvent.WaitOne();

                    // do smth ...
                });

                await client.StartOrFail();
                startEvent.Set();
            }

Or did I miss something?

Marfusios avatar Mar 13 '20 16:03 Marfusios

I am still able to reproduce the issue with both remote server and a server running on localhost. But to my original point. I just think it's unexpected behavior from caller perspective. Because I really did not expect for socket to be running before Start has finished. This way if I subscribe immediately after Start has finished I will forever lose those messages.

However I used following code to work around this in my implementation and hopefully it's ok solution using no locks or having any performance impact after startup is completed.

var disconnects = new List<DisconnectEvent>();
var messages = new List<ResponseMessage>();

//start temporary collecting messages and disconnects to local collection
using IDisposable disconnectSubscription = _socket.DisconnectionHappened
    .Select(info => new DisconnectEvent(info.CloseStatus, info.CloseStatusDescription, info.Exception))
    .Subscribe(disconnects.Add);
using IDisposable messageSubscription = _socket.MessageReceived
    .Where(m => m.MessageType == WebSocketMessageType.Text)
    .Subscribe(messages.Add);

//wait until start finishes
await _socket.StartOrFail();

//start real subscriptions and prepend any temporary collected data to them
_disconnectSubscription = _socket.DisconnectionHappened
    .Select(info => new DisconnectEvent(info.CloseStatus, info.CloseStatusDescription, info.Exception))
    .StartWith(disconnects)
    .Subscribe(Events.DisconnectSubject);
_messageSubscription = _socket.MessageReceived
    .Where(m => m.MessageType == WebSocketMessageType.Text)
    .StartWith(messages)
    .Subscribe(OnTextMessage);

If you consider this behavior as intended feel free to close the issue of course and thanks for cooperation.

LadislavBohm avatar Mar 13 '20 20:03 LadislavBohm

having start not block until it is done without using an async is breaking standard practice, you should have a sperate startAsync function that does what start does currently.

storm37000 avatar Apr 18 '20 22:04 storm37000