Scarlet
Scarlet copied to clipboard
0.2.x: Okhttp stomp protocol implementation
I added an implementation of the stomp protocol, which works over okhttp, because the gaziro based implementation does not support working with this protocol on top of websocket.
Hi everyone! Is there any estimate when would be merged @zhxnlai? This would help me a lot and solve all the problems in my life :)
@miguelhrocha Hi, I think you can just use these changes and create your own module in your project before the maintainer accepts it. It's going to be faster.
Hey @HaronCode thanks for the advice! I did exactly what you proposed, and now I am able to connect successfully to my Spring Boot backend :)
I can successfully send messages, but I am not able to receive anything broadcasted from the backend. Were you able to test the integration of a subscription towards a topic? I seriously can't see what's wrong with my code, looking that I can subscribe without any problems to the StateTransition
events.
This is my service for example:
interface BroadcastTopic {
@Receive
fun receiveBroadcast(): Flowable<String>
@Receive
fun observeProtocolEvent(): Flowable<ProtocolEvent>
}
I have the following configuration:
val stompProtocol = OkHttpStompClient(
configuration = OkHttpStompMainChannel.Configuration(host = "ws://10.0.2.2:8080/ws/websocket"),
okHttpClient = okHttpClient(),
requestFactory = {
OkHttpStompClient.ClientOpenRequest(
login = "guest",
passcode = "guest",
okHttpRequest = okHttpRequest()
)
}
)
configuration = Scarlet.Configuration(
lifecycle = AndroidLifecycle.ofLifecycleServiceStarted(application, this),
messageAdapterFactories = listOf(JacksonMessageAdapter.Factory()),
streamAdapterFactories = listOf(RxJava2StreamAdapterFactory()),
backoffStrategy = ExponentialWithJitterBackoffStrategy(1000, 5000),
debug = true
)
and this is the instantiation of the Topic
broadcastTopic = scarlet!!
.child(OkHttpStompDestination("/topic/broadcast"), configuration!!)
.create()
I am managing everything with a CompositeDisposable
Any help will be super appreciate it :)
@miguelhrocha Don't you forget about "ApplicationDestinationPrefixes" in Spring Boot backend? And try to check the destination without prefix "/topic". I don't see any mistakes in your client code, I guess you should figure out backend settings such as broker destination prefix and application destination prefix.
Hey @HaronCode I really appreciate all your help in this, you've been an incredible help. I think I found the issue but I think it's related to Scarlet's reflection capabilities.
So your STOMP implementation works flawlessly, I was always able to receive the message from the MainChannel. However, the problem was with the subscription of the the service that receives the string, the callback was never invoked for some reason:
@Receive
fun receiveBroadcast(): Flowable<String>
But I tried to do this and it works perfect, no issues at all:
val broadcastEvent = broadcastTopic!!.observeProtocolEvent()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.filter { it is ProtocolEvent.OnMessageReceived }
.subscribe {
val payload = it as ProtocolEvent.OnMessageReceived
Log.i("BROADCASTEVENT", it.toString())
toast((payload.message as Message.Text).value)
}
Do you have any insight on why this happened? Feel free to answer that this is out of the scope of your PR, which completely is :) and I will create an issue
I also have another question @HaronCode once again, thanks for all your help. Have you been able to reconnect the client when the Server stops the connection? I have been trying to push a State.STARTED
manually but that doesn't do anything
Hey @HaronCode me again :P this is the last time I'll bother you I think. In order to kickstart a reconnection in case of an unexpected closure from the Server I had to modify your WebSocketListener
with this:
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
if (code == 1001) {
listener.onFailed(this@OkHttpStompMainChannel, true, null)
[email protected] = null
} else {
listener.onClosing(this@OkHttpStompMainChannel)
}
}
You could add it if you see it as valuable, but I am pretty sure there most be a fancier way to achieve this :) Sadly, modifying the LifecycleState didn't work for me because if the callback of onClosing
starts, then the sideEffect
in the StateTransition
is null instead of a RetryAttempt
any update on this? I need a stomp client library
@miguelhrocha Thank you for all your comments! I really appreciate it. I improved my implementation, I added the handler for the case with unexpected server disconnect and added integration test for it. If you use my changes can you give me some feedback, maybe you have more ideas to improve this implementation?
Hi everyone! @zhxnlai Any news about merge this changes?
Hi @HaronCode, thank you for your stomp implementation. I'm trying to use it in my application, and I can connect to my local server. After the connection is established seems that the device continue to send the connection message, so it fails. What I want to achieve is that after connection the device subscribes to a channel, waiting for a message from the server. Here is my code
`val clientOpenResult = OkHttpStompClient.ClientOpenRequest(
login = LOGIN,
passcode = PASSWORD,
okHttpRequest = Request.Builder().apply {
url(URL)
}.build()
)
val stompProtocol = OkHttpStompClient(
configuration = OkHttpStompMainChannel.Configuration(
host = "URL",
heartbeatReceiveInterval = 100,
heartbeatSendInterval = 100
),
okHttpClient = client,
requestFactory = {
clientOpenResult
}
)
val channelFactory = stompProtocol.createChannelFactory().create(connectionListener, null)
channelFactory?.open(clientOpenResult)
val stompDestination = OkHttpStompDestination(destination = "jms.topic.chat")
val destinationChannelFactory = stompDestination.createChannelFactory().create(connectionListener, channelFactory)
destinationChannelFactory?.createMessageQueue(messageListener)
destinationChannelFactory?.open(clientOpenResult)`
Thanks in advance.
Hey @HaronCode I really appreciate all your help in this, you've been an incredible help. I think I found the issue but I think it's related to Scarlet's reflection capabilities.
So your STOMP implementation works flawlessly, I was always able to receive the message from the MainChannel. However, the problem was with the subscription of the the service that receives the string, the callback was never invoked for some reason:
@Receive fun receiveBroadcast(): Flowable<String>
But I tried to do this and it works perfect, no issues at all:
val broadcastEvent = broadcastTopic!!.observeProtocolEvent() .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) .filter { it is ProtocolEvent.OnMessageReceived } .subscribe { val payload = it as ProtocolEvent.OnMessageReceived Log.i("BROADCASTEVENT", it.toString()) toast((payload.message as Message.Text).value) }
Do you have any insight on why this happened? Feel free to answer that this is out of the scope of your PR, which completely is :) and I will create an issue
Hi @miguelhrocha did you figure out with this problem? I'm also able to receive messages only using ProtocolEvent.
Hi @HaronCode, sorry for spam. I'm finally able to send and receive messages from the websocket.
I'm currently looking for a way to manually unsubscribe from a topic, do you think that is possible to add an Annotation like @Send
or @Receive
, so that we can use syntax like myTopic.sendCommand(StompCommand.UNSUBSCRIBE)
?