RxNetty icon indicating copy to clipboard operation
RxNetty copied to clipboard

ClosedChannelException while TcpClient reads from TcpServer

Open JonasHallFnx opened this issue 7 years ago • 3 comments

I am getting java.nio.channels.ClosedChannelException in my TcpClient when the server closes the connection. I would expect the getInput() Observable to complete when the client notices a closed connection, rather than to return an error.

Consider the following test:

	@Test
	public void testTcpServerClient() {
		TcpServer<ByteBuf, ByteBuf> server = TcpServer.newServer(0)
				.start(c->c.writeStringAndFlushOnEach(just("one", "two")));

		TcpClient.newClient("127.0.0.1", server.getServerPort())
				.createConnectionRequest()
				.flatMap(Connection::getInput)
				.map(b->b.toString(Charset.defaultCharset()))
				.map(s->"Client recieved: "+s)
				.doOnNext(System.out::println)
				.toBlocking()
				.last();
	}

The test will output:

Client recieved: one
Client recieved: two

java.lang.RuntimeException: java.nio.channels.ClosedChannelException

Are my expectations wrong? How can I modify the test so that the client can read the input to the end? Examples in the repository uses take(x) which will unsubscribe after receiving the expected number of results, but how can I write an example with an arbitrary number of results?

JonasHallFnx avatar Sep 22 '17 12:09 JonasHallFnx

Hi Jonas,

it looks like io.reactivex.netty.channel.AbstractConnectionToChannelBridge expects the client to terminate the connection. I've not dug super deep into it but the code where the exception is prepared is commented with:

/If the subscriber is still active, then it expects data but the channel is closed./

Here is an example that shows how to escape (and test) the exception. I've also added some fixes for bugs that are in your original code.

import io.netty.buffer.ByteBuf;
import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.protocol.tcp.client.TcpClient;
import io.reactivex.netty.protocol.tcp.server.TcpServer;
import org.testng.annotations.Test;
import rx.Observable;
import rx.observables.StringObservable;
import rx.observers.TestSubscriber;

import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.util.stream.Stream;

public class TcpTest {
  @Test
  public void testSinkClosedChannelException() throws Exception {
    int count = 100;
    TcpServer<ByteBuf, ByteBuf> server = TcpServer.newServer(0)
      .start(
        c ->
          c.writeString(
            Observable.range(0, count)
              .map(i -> i + ",")
          )
      );

    TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
    TcpClient.newClient("127.0.0.1", server.getServerPort())
      .createConnectionRequest()
      .flatMap(Connection::getInput)
      .map(
        b -> {
          // must free the ByteBuf
          String s = b.toString(Charset.defaultCharset());
          b.release();
          return s;
        }
      )
      // can't guarantee that the block boundaries will line up so we force split
      .compose(o -> StringObservable.split(o, ","))
      .map(Integer::parseInt)
      .onErrorResumeNext(
        e -> {
          if (e instanceof ClosedChannelException) {
            // sink ClosedChannelException to nothing
            return Observable.empty();
          }
          // 'rethrow' all other errors
          return Observable.error(e);
        }
      )
      .subscribe(testSubscriber);

    testSubscriber.awaitTerminalEvent();

    testSubscriber.assertValueCount(count);
    testSubscriber.assertValues(
      Stream.iterate(0, i -> i + 1).limit(count).toArray(Integer[]::new)
    );
    testSubscriber.assertNoErrors();
}

jamesgorman2 avatar Sep 23 '17 10:09 jamesgorman2

Thank you for a quick reply!

Catching the ClosedChannelException and returning empty will work, but I was more interested in the thoughts behind considering a connection closed by the server as an error. I tried to dig down in the commits for some clues, but there is just one large commit by @NiteshKant ( https://github.com/ReactiveX/RxNetty/commit/9da1977e723b227f75f77ae14a810e1a86f1f7c5#diff-cb86b4ce0a39c9fea0a3174ae12680feR99 ) where this was changed from onCompleted to onError.

I will try to understand why this does not happen when using HttpServer + HttpClient.

JonasHallFnx avatar Sep 28 '17 10:09 JonasHallFnx

Yeah, beyond that comment I have no idea. Nothing struck me in the commit either. I had wondered if it is to signal client close vs server close (which would otherwise be lost) because Netflix had a socket client that needed to know this, but that's pure speculation.

jamesgorman2 avatar Sep 28 '17 22:09 jamesgorman2