StompProtocolAndroid
StompProtocolAndroid copied to clipboard
Subscribing to the same topic errors
This is reproducible by subscribing to a topic and then delaying the subscription to the same topic by ~5 seconds so that the first one has time to finish connecting.
Environment:
- Spring web-sockets backend
- Using OkHttpConnectionProvider
The root cause of this appears to be in the AbstractConnectionProvider
@NonNull
@Override
public Completable send(String stompMessage) {
return Completable.fromCallable(() -> {
if (getSocket() == null) {
throw new IllegalStateException("Not connected");
} else {
Log.d(TAG, "Send STOMP message: " + stompMessage);
rawSend(stompMessage);
return null;
}
});
}
If you debug inside this completable it will send the SUBSCRIBE stomp message when the second topic subscribes like below
SUBSCRIBE
id:fd285f5a-b293-4d9b-b7c1-978defa723ac
destination:<destination>
ack:auto
The message I am getting back when the second topic subscription occurs is:
ERROR
message: Duplicated subscription identifier
content-type:text/plain
version:1.0,1.1,1.2
content-length:85
A subscription identified by 'T_3d26b9e0-2723-410f-ae0f-ede924ee2d1f' already exists.
This is received in the OkHttpConnectionProvider's WebSocketListener onMessage so it's not an error response that closes the connection or anything.
@Override
public void onMessage(WebSocket webSocket, String text) {
emitMessage(text);
}
A bit more context here.
public Flowable<StompMessage> topic(@NonNull String destPath, List<StompHeader> headerList) {
if (destPath == null)
return Flowable.error(new IllegalArgumentException("Topic path cannot be null"));
else if (!streamMap.containsKey(destPath))
streamMap.put(destPath,
subscribePath(destPath, headerList).andThen(
getMessageStream()
.filter(msg -> pathMatcher.matches(destPath, msg))
.toFlowable(BackpressureStrategy.BUFFER)
.share()).doFinally(() -> unsubscribePath(destPath).subscribe())
);
return streamMap.get(destPath);
}
The "doFinally()" here is in the wrong spot and will result in "unsubscribePath(destPath)" being called for every Disposable that is disposed. This is shown in the test here:
val send = Completable.fromCallable {
print("sending...")
}
val testing = Completable.defer { send }
val publishSubject = PublishSubject.create<String>()
val test = testing.andThen(
publishSubject
.toFlowable(BackpressureStrategy.BUFFER)
.share()
.doFinally {
print("finally...")
}
)
val other = test.subscribe()
val other2 = test.subscribe()
other.dispose()
other2.dispose()
This will print finally... after each .dispose() is called.
Moving the "doFinally()" method before the "share()" will result in finally... being printed only after both disposables are disposed:
val send = Completable.fromCallable {
print("sending...")
}
val testing = Completable.defer { send }
val publishSubject = PublishSubject.create<String>()
val test = testing.andThen(
publishSubject
.toFlowable(BackpressureStrategy.BUFFER)
.doFinally {
print("finally...")
}
.share()
)
val other = test.subscribe()
val other2 = test.subscribe()
other.dispose()
other2.dispose()
These tests also show what is wrong with the topic subscription flow. By chaining the completable with "andThen()" to the Publisher we create something that causes the Completable to be run every time something subscribes to this shared Publisher.
In my case it will print sending... twice which is the same behaviour as subscribePath(destPath, headerList).andThen(...) in the topic subscription code above.
I could be wrong but I think the solution here is to make the following method deferred so it gets run instead of the returned Completable.
private fun subscribePath(destinationPath: String, headerList: List<StompHeader>?): Completable = Completable.defer {
val topicId = UUID.randomUUID().toString()
if (topics == null) topics = ConcurrentHashMap<String, String>()
// Only continue if we don't already have a subscription to the topic
if (topics.containsKey(destinationPath)) {
Log.d(TAG, "Attempted to subscribe to already-subscribed path!")
return Completable.complete()
}
topics.put(destinationPath, topicId)
val headers = ArrayList<StompHeader>()
headers.add(StompHeader(StompHeader.ID, topicId))
headers.add(StompHeader(StompHeader.DESTINATION, destinationPath))
headers.add(StompHeader(StompHeader.ACK, DEFAULT_ACK))
if (headerList != null) headers.addAll(headerList)
return send(
StompMessage(
StompCommand.SUBSCRIBE,
headers, null
)
)
}
The above should cause the Completable to instantly complete due to the guard check and never call "send()" twice.
@NaikSoftware @forresthopkinsa Is there any additional info you need? It's pretty clear that the intention is for the above to work (subscribe to the same topic multiple times)