rsocket-java icon indicating copy to clipboard operation
rsocket-java copied to clipboard

throw exception on errorConsumer, client will not finish

Open levil3 opened this issue 1 year ago • 0 comments

I tried throwing an exception in the errorConsumer on the server, so that the client's request seems to not be released, and will not do doOnComplete() and map()

Expected Behavior

I expect it to print 10 times "complete 1"

Actual Behavior

But it didn't print even once

Steps to Reproduce

my code is :

  • server
public static void main(String[] args) throws InterruptedException {
        RSocket rsocket = new RSocket() {
            @Override
            public Flux<Payload> requestStream(Payload p) {
                return Flux.create(emitter -> Mono.just("test").subscribe(payload -> {
                    throw new RuntimeException();
                }, throwable -> {
                    throw new RuntimeException();
                }, emitter::complete));
            }
        };

        LoopResources resources = LoopResources.create("test", 1, 2, true);
        TcpServer server = TcpServer.create().host("localhost").port(8090).runOn(resources);
        RSocketServer.create(SocketAcceptor.with(rsocket))
                .bind(TcpServerTransport.create(server))
                .subscribe();

        // wait
        TimeUnit.MINUTES.sleep(10);
}
  • client
public static void main(String[] args) throws InterruptedException {
        RSocket rSocket = RSocketConnector.create()
                .payloadDecoder(PayloadDecoder.ZERO_COPY)
                .connect(TcpClientTransport.create(8090))
                .doOnSuccess(rSocket1 -> {
                    System.out.println("connect server success!");
                })
                .block();

        for (int i = 0; i < 10; ++i) {
            Payload payload = ByteBufPayload.create("0");
            rSocket.requestStream(payload).map(Payload::getDataUtf8).doOnComplete(() -> {
                System.out.println("complete 1");
            }).map(e -> {
                System.out.println("doMap");
                return e;
            }).blockLast();
        }

        Thread.sleep(TimeUnit.SECONDS.toNanos(1000));

        // dispose
        rSocket.dispose();
}

Possible Solution

Your Environment

  • RSocket version(s) used: v1.1.4
  • jdk: 1.8

levil3 avatar Feb 04 '24 10:02 levil3