RxNetty
RxNetty copied to clipboard
ClosedChannelException while TcpClient reads from TcpServer
I am getting java.nio.channels.ClosedChannelException
in my TcpClient when the server closes the connection. I would expect the getInput()
Observable to complete when the client notices a closed connection, rather than to return an error.
Consider the following test:
@Test
public void testTcpServerClient() {
TcpServer<ByteBuf, ByteBuf> server = TcpServer.newServer(0)
.start(c->c.writeStringAndFlushOnEach(just("one", "two")));
TcpClient.newClient("127.0.0.1", server.getServerPort())
.createConnectionRequest()
.flatMap(Connection::getInput)
.map(b->b.toString(Charset.defaultCharset()))
.map(s->"Client recieved: "+s)
.doOnNext(System.out::println)
.toBlocking()
.last();
}
The test will output:
Client recieved: one
Client recieved: two
java.lang.RuntimeException: java.nio.channels.ClosedChannelException
Are my expectations wrong? How can I modify the test so that the client can read the input to the end? Examples in the repository uses take(x)
which will unsubscribe after receiving the expected number of results, but how can I write an example with an arbitrary number of results?
Hi Jonas,
it looks like io.reactivex.netty.channel.AbstractConnectionToChannelBridge
expects the client to terminate the connection. I've not dug super deep into it but the code where the exception is prepared is commented with:
/If the subscriber is still active, then it expects data but the channel is closed./
Here is an example that shows how to escape (and test) the exception. I've also added some fixes for bugs that are in your original code.
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.protocol.tcp.client.TcpClient;
import io.reactivex.netty.protocol.tcp.server.TcpServer;
import org.testng.annotations.Test;
import rx.Observable;
import rx.observables.StringObservable;
import rx.observers.TestSubscriber;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.util.stream.Stream;
public class TcpTest {
@Test
public void testSinkClosedChannelException() throws Exception {
int count = 100;
TcpServer<ByteBuf, ByteBuf> server = TcpServer.newServer(0)
.start(
c ->
c.writeString(
Observable.range(0, count)
.map(i -> i + ",")
)
);
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
TcpClient.newClient("127.0.0.1", server.getServerPort())
.createConnectionRequest()
.flatMap(Connection::getInput)
.map(
b -> {
// must free the ByteBuf
String s = b.toString(Charset.defaultCharset());
b.release();
return s;
}
)
// can't guarantee that the block boundaries will line up so we force split
.compose(o -> StringObservable.split(o, ","))
.map(Integer::parseInt)
.onErrorResumeNext(
e -> {
if (e instanceof ClosedChannelException) {
// sink ClosedChannelException to nothing
return Observable.empty();
}
// 'rethrow' all other errors
return Observable.error(e);
}
)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertValueCount(count);
testSubscriber.assertValues(
Stream.iterate(0, i -> i + 1).limit(count).toArray(Integer[]::new)
);
testSubscriber.assertNoErrors();
}
Thank you for a quick reply!
Catching the ClosedChannelException and returning empty will work, but I was more interested in the thoughts behind considering a connection closed by the server as an error. I tried to dig down in the commits for some clues, but there is just one large commit by @NiteshKant ( https://github.com/ReactiveX/RxNetty/commit/9da1977e723b227f75f77ae14a810e1a86f1f7c5#diff-cb86b4ce0a39c9fea0a3174ae12680feR99 ) where this was changed from onCompleted to onError.
I will try to understand why this does not happen when using HttpServer + HttpClient.
Yeah, beyond that comment I have no idea. Nothing struck me in the commit either. I had wondered if it is to signal client close vs server close (which would otherwise be lost) because Netflix had a socket client that needed to know this, but that's pure speculation.