tyrus icon indicating copy to clipboard operation
tyrus copied to clipboard

Client hangs if i send recoursively

Open glassfishrobot opened this issue 10 years ago • 7 comments

Server side:

@ServerEndpoint("/tests/1client")
public class Tests1Client
{

    @OnOpen
    public void open(Session client) {
        System.out.println("/websocket - open");
    }

    @OnMessage
    public void shout(String text, Session client) throws InterruptedException, IOException
    {
        System.out.println("/websocket -> " + text);
        client.getBasicRemote().sendText(text + "!!!");
    }

    @OnClose
    public void close(Session client) {
        System.out.println("/websocket - close");
    }
}

Case 1: client sends 1000 messages and received 1000 responses. Appeared result is same as expected. Client with iteration of messages sending:

import javax.websocket.*;
import java.net.URI;
import java.util.concurrent.*;

public class Client1
{
    private static final int MESSAGES_COUNT = 1000;
    public static final String URL = "ws://localhost:8080/jsr356/tests/1client";
    private static final String MESSAGE_50_SYMBOLS = "HelloWorldHelloWorldHelloWorldHelloWorldHelloWorld";

    private long startTime;

    private final CountDownLatch messageLatch = new CountDownLatch(MESSAGES_COUNT);
    private ExecutorService exec;

    public void start()
    {
        try
        {
            exec = Executors.newFixedThreadPool(3);
            startTime = System.currentTimeMillis();

            ClientManager client = ClientManager.createClient();
            client.connectToServer(new Endpoint()
            {
@Override
public void onOpen(final Session session, EndpointConfig config)
{
    System.out.println("Client connected in time " + ((System.currentTimeMillis() - startTime) / 1000) + "s");

    session.addMessageHandler(new MessageHandler.Whole<String>()
    {
        @Override
        public void onMessage(String message)
        {
            System.out.println(" Client received message " + message);

            messageLatch.countDown();

        }
    });
    sendMessage(session);
}

@Override
public void onClose(Session session, CloseReason closeReason)
{
    System.out.println("Client disconnected");
}

@Override
public void onError(Session session, Throwable thr)
{
    System.out.println("Client received error");
}
            }, ClientEndpointConfig.Builder.create().build(), new URI(URL));
            messageLatch.await();
            if (messageLatch.getCount() != 0)
            {
System.out.println("Must be sent " + MESSAGES_COUNT + " but send only " + (MESSAGES_COUNT - messageLatch.getCount()));
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }

    private void sendMessage(Session session)
    {
        final RemoteEndpoint.Async async = session.getAsyncRemote();
        for (int i = 0; i < MESSAGES_COUNT; i++)
        {
            final int index = i;
            exec.submit(new Callable<Object>()
            {
@Override
public Future<Void> call() throws Exception
{

    System.out.println("Try to send message " + " " + index + " " + MESSAGE_50_SYMBOLS);
    async.sendText(index + " Message " + MESSAGE_50_SYMBOLS);
    return null;
}
            });
        }
    }

    public static void main(String[] args)
    {
        new Client1().start();
    }
}

Case 2: client should send message and receive response 1000 times (algorithm is recursive: next message sending client will make in onMessahe handler). Client with recursive "ping-pong" messages sending:

import org.glassfish.tyrus.client.ClientManager;

import javax.websocket.*;
import java.net.URI;
import java.util.concurrent.*;

public class Client1_recursive
{
    private static final int MESSAGES_COUNT = 1000;
    public static final String URL = "ws://localhost:8080/jsr356/tests/1client";
    private static final String MESSAGE_50_SYMBOLS = "HelloWorldHelloWorldHelloWorldHelloWorldHelloWorld";

    private long startTime;

    private final CountDownLatch messageLatch = new CountDownLatch(MESSAGES_COUNT);
    private ExecutorService exec;

    public void start()
    {
        try
        {
            exec = Executors.newFixedThreadPool(3);
            startTime = System.currentTimeMillis();

            ClientManager client = ClientManager.createClient();
            client.connectToServer(new Endpoint()
            {
@Override
public void onOpen(final Session session, EndpointConfig config)
{
    System.out.println("Client connected in time " + ((System.currentTimeMillis() - startTime) / 1000) + "s");

    session.addMessageHandler(new MessageHandler.Whole<String>()
    {
        @Override
        public void onMessage(String message)
        {
            System.out.println(" Client received message " + message);

            messageLatch.countDown();

            sendMessage(session);
        }
    });
    sendMessage(session);
}

@Override
public void onClose(Session session, CloseReason closeReason)
{
    System.out.println("Client disconnected");
}

@Override
public void onError(Session session, Throwable thr)
{
    System.out.println("Client received error");
}
            }, ClientEndpointConfig.Builder.create().build(), new URI(URL));
            messageLatch.await();
            if (messageLatch.getCount() != 0)
            {
System.out.println("Must be sent " + MESSAGES_COUNT + " but send only " + (MESSAGES_COUNT - messageLatch.getCount()));
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }

    private int i = 0;

    private void sendMessage(Session session)
    {
        final int index = i++;

        final RemoteEndpoint.Async async = session.getAsyncRemote();
        exec.submit(new Callable<Object>()
        {
            @Override
            public Future<Void> call() throws Exception
            {

System.out.println("Try to send message " + " " + index + " " + MESSAGE_50_SYMBOLS);
async.sendText(index + " Message " + MESSAGE_50_SYMBOLS);
return null;
            }
        });
    }

    public static void main(String[] args)
    {
        new Client1_recursive().start();
    }
}

In recursive algorithm about 25 (or 74, or 90...) messages was sent, but client hanged , logs for example may be that:

Try to send message  21 HelloWorldHelloWorldHelloWorldHelloWorldHelloWorld
 Client received message 21 Message HelloWorldHelloWorldHelloWorldHelloWorldHelloWorld!!!
Try to send message  22 HelloWorldHelloWorldHelloWorldHelloWorldHelloWorld
 Client received message 22 Message HelloWorldHelloWorldHelloWorldHelloWorldHelloWorld!!!
Try to send message  23 HelloWorldHelloWorldHelloWorldHelloWorldHelloWorld
 Client received message 23 Message HelloWorldHelloWorldHelloWorldHelloWorldHelloWorld!!!
Try to send message  24 HelloWorldHelloWorldHelloWorldHelloWorldHelloWorld

As seen client sends message 24 ant it must be received on server, but in server logs i see that message is not received on server, and client hanged. Why?

glassfishrobot avatar Jul 27 '15 07:07 glassfishrobot

  • Issue Imported From: https://github.com/tyrus-project/tyrus/issues/581
  • Original Issue Raised By:@glassfishrobot
  • Original Issue Assigned To: @pavelbucek

glassfishrobot avatar Feb 10 '18 20:02 glassfishrobot

@glassfishrobot Commented Reported by goleon

glassfishrobot avatar Jul 27 '15 07:07 glassfishrobot

@glassfishrobot Commented @pavelbucek said: This could be a bug but for different reason.

The latter usecase should not work at all - onMessage() method shouldn't be invoked in parallel for the same client. Tyrus should make sure that next onMessage() is not invoked before previous one is finished; we cannot assure in-order message delivery otherwise.

glassfishrobot avatar Jul 27 '15 07:07 glassfishrobot

@glassfishrobot Commented goleon said: Thanks, could you explain this restriction please. It is from WebSocket transport or JSR 356 standard or from JSR server side or JSR client side implementation?

glassfishrobot avatar Jul 27 '15 08:07 glassfishrobot

@glassfishrobot Commented goleon said: And i removed concurrent tasks executor but problem still exists...

glassfishrobot avatar Jul 27 '15 08:07 glassfishrobot

@glassfishrobot Commented @pavelbucek said: the "restriction" is from the JSR 356 itself - as I already mentioned, we cannot guarantee in-order message delivery otherwise. Client/Server side does not matter, since they are equal after WebSocket handshake.

I overlooked that you are sending the message from ExecutorService - in that case, it should work; leave it there and to jstack when it "hangs" and post it here, we might be able to find something unusual.

glassfishrobot avatar Jul 27 '15 08:07 glassfishrobot

@glassfishrobot Commented This issue was imported from java.net JIRA TYRUS-404

glassfishrobot avatar Apr 25 '17 03:04 glassfishrobot