rsocket-java
rsocket-java copied to clipboard
Resume functionality causes client wait for answer forever in 1.0.2
After upgrading to 1.0.2 version Resume functionality cause stuck client waiting to answer forever At 1.0.1 all worked as expected.Also if I remove resume code all work as expected. I attached example with channel but the same behaviour with response/request, request/stream.
Server Rsocket code:
@Bean
public RSocketServerCustomizer customize() {
Resume resume = new Resume();
return rSocketServer -> rSocketServer.payloadDecoder(PayloadDecoder.ZERO_COPY).resume(resume);
}
Controller
@MessageMapping("part-type.createOrUpdate")
public Flux<DaeApiPartType> createOrUpdate(Flux<DaeApiPartType> partTypes) {
return controlPartType.createOrUpdate(partTypes);
}
Client RSocket Side:
public synchronized RSocket init() {
if (clientRSocket == null || clientRSocket.isDisposed()) {
clientRSocket = RSocketConnector
.create()
.metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString())
.payloadDecoder(PayloadDecoder.ZERO_COPY)
//.reconnect(Retry.backoff(50, Duration.ofMillis(500)))
.dataMimeType(WellKnownMimeType.APPLICATION_JSON.toString())
.resume(new Resume()
//.sessionDuration(getLocalConfig().getDuration("resumeDurationInMinutes"))
//.retry(Retry.fixedDelay(Long.MAX_VALUE, getLocalConfig().getDuration("fixedDelayInSeconds")))
//.streamTimeout(getLocalConfig().getDuration("resumeTimeOutInSeconds"))
)
.connect(TcpClientTransport.create(getLocalConfig().getString("host"),
getLocalConfig().getInt("port")))
.doOnError(exception -> {
getLogger().info(DaeLogKeys.rsocket.failed.error, exception);
dispose();
})
.doOnCancel(() -> {
getLogger().info(DaeLogKeys.rsocket.failed.canceled);
dispose();
})
/*.doOnTerminate(()->{
getLogger().info(DaeLogKeys.rsocket.failed.terminated);
dispose();
})*/
.block();
}
return clientRSocket;
}
Call to Rsocket server :
public Response addPartTypes(DaeApiCollection<DaeApiPartType> partTypes) {
RSocket rsocket = socket.init();
Flux<Payload> payloadFlux = Flux.fromIterable(partTypes.getData()).map(partType ->
DefaultPayload.create(Unpooled.wrappedBuffer(socket.encode(partType).getBytes()),
socket.route("part-type.createOrUpdate")));
return run(() -> httpService.toJsonSuccess(rsocket.requestChannel(payloadFlux)
.map(Payload::getDataUtf8)
.map(doc -> socket.decode(doc, DaeApiPartType.class))
.collect(Collectors.toSet())
.block()));
}
Environment two different services: client->pure java 11 with rsocket 1.0.2 netty 1.0.2 server -> springboot 2.3.3
Please advise.
Hi @lena-precog!
Moving this issue to RSokcet-Java repo, since rsocket/rsocket one is intended for the specification related issues.
Just a +1 - I noticed the same thing while investigating resumability.
closing because of inactivity