rsocket-js
rsocket-js copied to clipboard
requestChannel: error when subscribing again
When using requestChannel(), one can subscribe to the Flowable returned. One can unsubscribe as well. If one subscribes again to this flowable, it does not work.
Expected Behavior
After re-subscribing, one will receive values again with the given subscriber. No warning will occur on console.
Actual Behavior
A warning is logged on console (RSocketClient: re-entrant call to request n before initial channel established.). No new values are received by the subscriber.
Steps to Reproduce
const metadata = ...;
const client = new RSocketClient(...);
const processor = new FlowableProcessor(sub => { });
const inputFlowable = processor.map(i => { return { data: i, metadata: metadata } });
client.connect().then(
socket => {
var subscription;
const subscriber = {
onNext: value => console.log('Value from Responder', value.data),
onSubscribe: sub => { subscription = sub; sub.request(0x7fffffff) },
};
// responder will multiply the given value by 2
const channelFlowable = socket.requestChannel(inputFlowable);
channelFlowable.subscribe(subscriber);
setTimeout(() => processor.onNext(1), 1000);
setTimeout(() => processor.onNext(2), 2000);
setTimeout(() => { console.log("unsubscribe"); subscription.cancel(); }, 3000);
setTimeout(() => { console.log("subscribe again"); channelFlowable.subscribe(subscriber); }, 4000);
setTimeout(() => processor.onNext(3), 5000);
//output on console
// Value from Responder 2
// Value from Responder 4
// Unsubscribe
// subscribe again
// RSocketClient: re-entrant call to request n before initial channel established.
}
);
Workaround
Instead of re-subscribing call requestChannel() again
Possible Solution
Looks like the variables 'initialized' and 'payloadsSubscribed' in the requestChannel()-implementation of RSocketMachine are not reset.
Your Environment
- RSocket version(s) used: 0.0.27
Happy new year!
Hi @dholzenburg ,
If I am interpreting the protocol documentation correctly, I believe that this is working as per protocol spec. The Protocol spec for Request Channel states:
Upon receiving a CANCEL, the stream is terminated on the Responder.
Upon sending a CANCEL, the stream is terminated on the Requester.
Given that, if the Requester or a Responder of a RequestChannel stream send CANCEL, you must re-establish the stream using a subsequent call to requestChannel. You cannot simply re-subscribe to the stream returned form requestStream, as it has already been terminated.
I will also note that while the protocol has not changed, these APIs have changed recently with 1.0.0-alpha.1, and the flowable API is no longer available.