rsocket-rpc-java icon indicating copy to clipboard operation
rsocket-rpc-java copied to clipboard

client never gets notified when server goes away

Open apbthere opened this issue 7 years ago • 2 comments

The client never gets notified when server goes away. It just waits indefinitely. The following unit test creates a simple server that streams 200 messages to the client. Client will call Object.wait() on server socket effectively hanging the server after receiving 2 messages and then wait for 20 more messages or it will time out in 10 seconds. The client doesn't throw any exceptions nor signals onError.


package com.example.demo;

import java.util.Date;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import com.poc.protobuf.UnitRequest;
import com.poc.protobuf.UnitResponse;
import com.poc.protobuf.UnitService;
import com.poc.protobuf.UnitServiceClient;
import com.poc.protobuf.UnitServiceServer;

import io.netty.buffer.ByteBuf;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.rpc.rsocket.RequestHandlingRSocket;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.NettyContextCloseable;
import io.rsocket.transport.netty.server.TcpServerTransport;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class DefaultSimpleServiceTests {

class DefaultSimpleService implements UnitService {

@Override
public Flux<UnitResponse> requestStream(UnitRequest unitRequest, ByteBuf metadata) {
	String command = unitRequest.getRequestCommandMessage();
	
	return Flux.range(1, 200)
			.map(i -> UnitResponse.newBuilder()
					.setMessageNumber(i)
					.setResponseMessage(i + " Srever is processsing " + command + " command")
					.build());
}
}

  @Test
  public void test1() throws Exception {
	  UnitServiceServer serviceServer = new UnitServiceServer(new DefaultSimpleService(), Optional.empty(), Optional.empty());
	  
	    NettyContextCloseable serverSocket = RSocketFactory.receive()
	        .acceptor(
	            (setup, sendingSocket) ->
	                Mono.just(new RequestHandlingRSocket(serviceServer)))
	        .transport(TcpServerTransport.create(8801))
	        .start()
	        .block();
	    
	    RSocket rSocket = RSocketFactory.connect().transport(TcpClientTransport.create(8801)).start().block();

	    UnitServiceClient client = new UnitServiceClient(rSocket);
	    
	    	CountDownLatch latch = new CountDownLatch(22);
	    	
	    	client.requestStream(UnitRequest.newBuilder().setRequestCommandMessage("Give me some data!").build())
	    			.subscribe(new Subscriber<UnitResponse>() {

				private Subscription subscription;

				@Override
				public void onSubscribe(Subscription s) {
					this.subscription = s;
					s.request(1);
				}

				@Override
				public void onNext(UnitResponse t) {
					System.out.println("Received message " + t.getResponseMessage());
					latch.countDown();
					
					if (latch.getCount() < 20) {
						System.out.println("Killing server now...");
						try {
							// this will halt the thread causing server to disappear 
							serverSocket.wait();
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
					
					subscription.request(1);
				}

				@Override
				public void onError(Throwable t) {
					System.out.println("Error detected! " + t);
				}

				@Override
				public void onComplete() {
					System.out.println("Stream completed!");
				}
			} );
	    	
	    	latch.await(10, TimeUnit.SECONDS);
	    	System.out.println("Finished at " + new Date().toString());
  }
}

proto file

syntax = "proto3";

package com.harris.atom.poc;

import "google/protobuf/empty.proto";

option java_package = "com.poc.protobuf";
option java_outer_classname = "UnitServiceProto";
option java_multiple_files = true;

service UnitService {

// Single Request / Streaming Response
rpc RequestStream (UnitRequest) returns (stream UnitResponse) {}
}

message UnitRequest {
string requestCommandMessage = 1;
}

message UnitResponse {
string responseMessage = 1;
int32 messageNumber = 2;
}

apbthere avatar Oct 18 '18 17:10 apbthere

@mostroverkhov does your fix to rsocket help with this?

robertroeser avatar Nov 04 '18 04:11 robertroeser

@robertroeser It helps, but https://github.com/reactor/reactor-netty/issues/495 has to be resolved also

mostroverkhov avatar Nov 05 '18 17:11 mostroverkhov