Scarlet icon indicating copy to clipboard operation
Scarlet copied to clipboard

0.2.x: Okhttp stomp protocol implementation

Open Hukumister opened this issue 5 years ago • 14 comments

I added an implementation of the stomp protocol, which works over okhttp, because the gaziro based implementation does not support working with this protocol on top of websocket.

Hukumister avatar Jan 10 '20 06:01 Hukumister

CLA assistant check
All committers have signed the CLA.

CLAassistant avatar Mar 07 '20 08:03 CLAassistant

Hi everyone! Is there any estimate when would be merged @zhxnlai? This would help me a lot and solve all the problems in my life :)

miguelhrocha avatar Jul 27 '20 16:07 miguelhrocha

@miguelhrocha Hi, I think you can just use these changes and create your own module in your project before the maintainer accepts it. It's going to be faster.

Hukumister avatar Jul 29 '20 05:07 Hukumister

Hey @HaronCode thanks for the advice! I did exactly what you proposed, and now I am able to connect successfully to my Spring Boot backend :)

I can successfully send messages, but I am not able to receive anything broadcasted from the backend. Were you able to test the integration of a subscription towards a topic? I seriously can't see what's wrong with my code, looking that I can subscribe without any problems to the StateTransition events.

This is my service for example:

interface BroadcastTopic {

    @Receive
    fun receiveBroadcast(): Flowable<String>

    @Receive
    fun observeProtocolEvent(): Flowable<ProtocolEvent>

}

I have the following configuration:

        val stompProtocol = OkHttpStompClient(
            configuration = OkHttpStompMainChannel.Configuration(host = "ws://10.0.2.2:8080/ws/websocket"),
            okHttpClient = okHttpClient(),
            requestFactory = {
                OkHttpStompClient.ClientOpenRequest(
                    login = "guest",
                    passcode = "guest",
                    okHttpRequest = okHttpRequest()
                )
            }
        )

        configuration = Scarlet.Configuration(
            lifecycle = AndroidLifecycle.ofLifecycleServiceStarted(application, this),
            messageAdapterFactories = listOf(JacksonMessageAdapter.Factory()),
            streamAdapterFactories = listOf(RxJava2StreamAdapterFactory()),
            backoffStrategy = ExponentialWithJitterBackoffStrategy(1000, 5000),
            debug = true
        )

and this is the instantiation of the Topic

broadcastTopic = scarlet!!
            .child(OkHttpStompDestination("/topic/broadcast"), configuration!!)
            .create()

I am managing everything with a CompositeDisposable

Any help will be super appreciate it :)

miguelhrocha avatar Jul 30 '20 14:07 miguelhrocha

@miguelhrocha Don't you forget about "ApplicationDestinationPrefixes" in Spring Boot backend? And try to check the destination without prefix "/topic". I don't see any mistakes in your client code, I guess you should figure out backend settings such as broker destination prefix and application destination prefix.

Hukumister avatar Jul 31 '20 04:07 Hukumister

Hey @HaronCode I really appreciate all your help in this, you've been an incredible help. I think I found the issue but I think it's related to Scarlet's reflection capabilities.

So your STOMP implementation works flawlessly, I was always able to receive the message from the MainChannel. However, the problem was with the subscription of the the service that receives the string, the callback was never invoked for some reason:

@Receive
fun receiveBroadcast(): Flowable<String>

But I tried to do this and it works perfect, no issues at all:

val broadcastEvent = broadcastTopic!!.observeProtocolEvent()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .filter { it is ProtocolEvent.OnMessageReceived }
            .subscribe {
                val payload = it as ProtocolEvent.OnMessageReceived
                Log.i("BROADCASTEVENT", it.toString())
                toast((payload.message as Message.Text).value)
            }

Do you have any insight on why this happened? Feel free to answer that this is out of the scope of your PR, which completely is :) and I will create an issue

miguelhrocha avatar Jul 31 '20 12:07 miguelhrocha

I also have another question @HaronCode once again, thanks for all your help. Have you been able to reconnect the client when the Server stops the connection? I have been trying to push a State.STARTED manually but that doesn't do anything

miguelhrocha avatar Jul 31 '20 15:07 miguelhrocha

Hey @HaronCode me again :P this is the last time I'll bother you I think. In order to kickstart a reconnection in case of an unexpected closure from the Server I had to modify your WebSocketListener with this:

override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
            if (code == 1001) {
                listener.onFailed(this@OkHttpStompMainChannel, true, null)
                [email protected] = null
            } else {
                listener.onClosing(this@OkHttpStompMainChannel)
            }
        }

You could add it if you see it as valuable, but I am pretty sure there most be a fancier way to achieve this :) Sadly, modifying the LifecycleState didn't work for me because if the callback of onClosing starts, then the sideEffect in the StateTransition is null instead of a RetryAttempt

miguelhrocha avatar Jul 31 '20 17:07 miguelhrocha

any update on this? I need a stomp client library

veeva-mark avatar Aug 10 '20 22:08 veeva-mark

@miguelhrocha Thank you for all your comments! I really appreciate it. I improved my implementation, I added the handler for the case with unexpected server disconnect and added integration test for it. If you use my changes can you give me some feedback, maybe you have more ideas to improve this implementation?

Hukumister avatar Aug 11 '20 16:08 Hukumister

Hi everyone! @zhxnlai Any news about merge this changes?

francescosalamone avatar Oct 23 '20 10:10 francescosalamone

Hi @HaronCode, thank you for your stomp implementation. I'm trying to use it in my application, and I can connect to my local server. After the connection is established seems that the device continue to send the connection message, so it fails. What I want to achieve is that after connection the device subscribes to a channel, waiting for a message from the server. Here is my code

    `val clientOpenResult = OkHttpStompClient.ClientOpenRequest(
        login = LOGIN,
        passcode = PASSWORD,
        okHttpRequest = Request.Builder().apply {
            url(URL)
        }.build()
    )

    val stompProtocol = OkHttpStompClient(
        configuration = OkHttpStompMainChannel.Configuration(
            host = "URL",
            heartbeatReceiveInterval = 100,
            heartbeatSendInterval = 100
        ),
        okHttpClient = client,
        requestFactory = {
            clientOpenResult
        }
    )

    val channelFactory = stompProtocol.createChannelFactory().create(connectionListener, null)
    channelFactory?.open(clientOpenResult)

    val stompDestination = OkHttpStompDestination(destination = "jms.topic.chat")
    val destinationChannelFactory = stompDestination.createChannelFactory().create(connectionListener, channelFactory)
    destinationChannelFactory?.createMessageQueue(messageListener)
    destinationChannelFactory?.open(clientOpenResult)`

Thanks in advance.

francescosalamone avatar Oct 26 '20 12:10 francescosalamone

Hey @HaronCode I really appreciate all your help in this, you've been an incredible help. I think I found the issue but I think it's related to Scarlet's reflection capabilities.

So your STOMP implementation works flawlessly, I was always able to receive the message from the MainChannel. However, the problem was with the subscription of the the service that receives the string, the callback was never invoked for some reason:

@Receive
fun receiveBroadcast(): Flowable<String>

But I tried to do this and it works perfect, no issues at all:

val broadcastEvent = broadcastTopic!!.observeProtocolEvent()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .filter { it is ProtocolEvent.OnMessageReceived }
            .subscribe {
                val payload = it as ProtocolEvent.OnMessageReceived
                Log.i("BROADCASTEVENT", it.toString())
                toast((payload.message as Message.Text).value)
            }

Do you have any insight on why this happened? Feel free to answer that this is out of the scope of your PR, which completely is :) and I will create an issue

Hi @miguelhrocha did you figure out with this problem? I'm also able to receive messages only using ProtocolEvent.

francescosalamone avatar Nov 03 '20 15:11 francescosalamone

Hi @HaronCode, sorry for spam. I'm finally able to send and receive messages from the websocket. I'm currently looking for a way to manually unsubscribe from a topic, do you think that is possible to add an Annotation like @Send or @Receive, so that we can use syntax like myTopic.sendCommand(StompCommand.UNSUBSCRIBE)?

francescosalamone avatar Nov 06 '20 17:11 francescosalamone