rsocket-java icon indicating copy to clipboard operation
rsocket-java copied to clipboard

Resume functionality causes client wait for answer forever in 1.0.2

Open lena-precog opened this issue 5 years ago • 2 comments

After upgrading to 1.0.2 version Resume functionality cause stuck client waiting to answer forever At 1.0.1 all worked as expected.Also if I remove resume code all work as expected. I attached example with channel but the same behaviour with response/request, request/stream.

Server Rsocket code:

   @Bean
    public RSocketServerCustomizer customize() {

            Resume resume = new Resume();
           return rSocketServer -> rSocketServer.payloadDecoder(PayloadDecoder.ZERO_COPY).resume(resume);
    }

Controller

@MessageMapping("part-type.createOrUpdate")
   public Flux<DaeApiPartType> createOrUpdate(Flux<DaeApiPartType> partTypes) {
      return controlPartType.createOrUpdate(partTypes);
   }

Client RSocket Side:

public synchronized RSocket init() {
        if (clientRSocket == null || clientRSocket.isDisposed()) {
            clientRSocket = RSocketConnector
                            .create()
                            .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString())
                            .payloadDecoder(PayloadDecoder.ZERO_COPY)
                            //.reconnect(Retry.backoff(50, Duration.ofMillis(500)))
                            .dataMimeType(WellKnownMimeType.APPLICATION_JSON.toString())

                           .resume(new Resume()
                                    //.sessionDuration(getLocalConfig().getDuration("resumeDurationInMinutes"))
                                    //.retry(Retry.fixedDelay(Long.MAX_VALUE, getLocalConfig().getDuration("fixedDelayInSeconds")))
                                    //.streamTimeout(getLocalConfig().getDuration("resumeTimeOutInSeconds"))
                            )

                            .connect(TcpClientTransport.create(getLocalConfig().getString("host"),
                                    getLocalConfig().getInt("port")))
                            .doOnError(exception -> {
                                getLogger().info(DaeLogKeys.rsocket.failed.error, exception);
                                dispose();

                            })
                            .doOnCancel(() -> {
                                getLogger().info(DaeLogKeys.rsocket.failed.canceled);
                                dispose();
                            })
                    /*.doOnTerminate(()->{
                        getLogger().info(DaeLogKeys.rsocket.failed.terminated);
                        dispose();
                    })*/
                            .block();
        }
        return clientRSocket;
    }

Call to Rsocket server :

 public Response addPartTypes(DaeApiCollection<DaeApiPartType> partTypes) {
       RSocket rsocket = socket.init();
       Flux<Payload> payloadFlux = Flux.fromIterable(partTypes.getData()).map(partType ->
               DefaultPayload.create(Unpooled.wrappedBuffer(socket.encode(partType).getBytes()),
                       socket.route("part-type.createOrUpdate")));
       return run(() -> httpService.toJsonSuccess(rsocket.requestChannel(payloadFlux)
                                                           .map(Payload::getDataUtf8)
                                                           .map(doc -> socket.decode(doc, DaeApiPartType.class))
                                                           .collect(Collectors.toSet())
                                                           .block()));
   }

Environment two different services: client->pure java 11 with rsocket 1.0.2 netty 1.0.2 server -> springboot 2.3.3

Please advise.

lena-precog avatar Aug 21 '20 08:08 lena-precog

Hi @lena-precog!

Moving this issue to RSokcet-Java repo, since rsocket/rsocket one is intended for the specification related issues.

OlegDokuka avatar Aug 21 '20 08:08 OlegDokuka

Just a +1 - I noticed the same thing while investigating resumability.

kevmcm avatar Apr 13 '21 22:04 kevmcm

closing because of inactivity

OlegDokuka avatar May 31 '23 19:05 OlegDokuka