scalecube-cluster icon indicating copy to clipboard operation
scalecube-cluster copied to clipboard

Wrong response message when using "requestResponse" method (send message to self).

Open smyrgeorge opened this issue 1 year ago • 6 comments
trafficstars

Hello!

I was just playing around and I think I encountered a bug.

The problem is when we make use of the requestResponse method in order to send messages to our self (acting like a node).

The code correctly waits for the response, although it does not return the right response message.

For details just take a look at the following example (kotlin).

println(res) does not print "Pong!"

fun main(args: Array<String>) {
    val alias = System.getenv("ACTOR_NODE_ID") ?: "node-1"
    val seedPort = System.getenv("ACTOR_NODE_SEED_PORT")?.toInt() ?: 61100

    // Build cluster.
    val cluster: Cluster = ClusterImpl().transport { it.port(seedPort) }
        .config { it.memberAlias(alias) }
        .transportFactory { TcpTransportFactory() }
        .handler {
            object : ClusterMessageHandler {
                override fun onMessage(message: Message) {
                    println("Received message: $message")

                    if (message.correlationId() != null
                        && message.header("x-is-reply") != null
                    ) {

                        val response = Message
                            .builder()
                            .correlationId(message.correlationId())
                            .header("x-is-reply", "t")
                            .data("Pong!")
                            .build()

                        runBlocking { it.send(message.sender(), response).awaitFirstOrNull() }
                    }
                }

                override fun onGossip(gossip: Message) {
                    println("Received message: $gossip")
                }

                override fun onMembershipEvent(event: MembershipEvent) {
                    println("Received membership-event: $event")
                }
            }
        }.startAwait()

    runBlocking {
        while (true) {
            val self: Member = cluster.member()
            val message: Message = Message.builder().correlationId("test_id").data("Ping!").build()
            val res: String = cluster.requestResponse(self, message).awaitSingle().data()

            println(res) // Prints "Ping!" and not "Pong!"

            // Block main thread.
            delay(1_000)
        }
    }
}

The above code can be found here.

Here is the output for the code above:

0    [main] INFO  io.scalecube.cluster.Cluster  - [null][doStart] Starting, config: ClusterConfig[metadata=null, metadataTimeout=3000, metadataCodec=io.scalecube.cluster.metadata.JdkMetadataCodec@7dc222ae, memberId='null', memberAlias='node-1', externalHosts=null, transportConfig=TransportConfig[port=61100, clientSecured=false, connectTimeout=3000, messageCodec=io.scalecube.cluster.transport.api.JdkMessageCodec@aecb35a, maxFrameLength=2097152, transportFactory=io.scalecube.transport.netty.tcp.TcpTransportFactory@5fcd892a, addressMapper=java.util.function.Function$$Lambda$34/0x0000000800c69e20@8b87145], failureDetectorConfig=FailureDetectorConfig[pingInterval=1000, pingTimeout=500, pingReqMembers=3], gossipConfig=GossipConfig[gossipFanout=3, gossipInterval=200, gossipRepeatMult=3, gossipSegmentationThreshold=1000], membershipConfig=MembershipConfig[seedMembers=[], syncInterval=30000, syncTimeout=3000, suspicionMult=5, namespace='default', removedMembersHistorySize=42]]
324  [sc-cluster-io-nio-1] INFO  io.scalecube.cluster.transport.api.Transport  - [start][/[0:0:0:0:0:0:0:0]:61100] Bound cluster transport
403  [sc-cluster-io-nio-1] INFO  io.scalecube.cluster.Cluster  - [default:node-1:7778863775a34011][doStart] Started
default:node-1:7778863775a34011
[default:node-1:7778863775a34011]
Received message: Message[headers={sender=192.168.2.10:61100, cid=test_id}, data=Ping!]
Ping!
Received message: Message[headers={sender=192.168.2.10:61100, cid=test_id}, data=Ping!]
Ping!

Am I missing something?

Thanks a lot!

smyrgeorge avatar Dec 17 '23 17:12 smyrgeorge

@smyrgeorge Hi, thanks for reporting. We will look into this till FRI.

artem-v avatar Dec 20 '23 10:12 artem-v

Just forgot to mention that I'm using the 2.7.0.rc version.

smyrgeorge avatar Dec 20 '23 11:12 smyrgeorge

Honestly I don't remember what's 2.7.0.rc, looks like it was some development of something. I was trying to reproduce your program in java, on latest version, on 2.6.17, and confusion is following - where do you initially send message to the cluster? I see you declared message handler:

override fun onMessage(message: Message) 
...

it's ok, but this handler must be activated upon receive of some message, question is - where do you send this message to cluster with non-null correlationId and header "x-is-reply"?

artem-v avatar Dec 22 '23 16:12 artem-v

I just tried the version 2.6.17 and the problem remains.

I'm sending the message to the node that sent the Ping! request.

smyrgeorge avatar Dec 22 '23 19:12 smyrgeorge

Can you rewrite example in java. It's hard to understand what' going on, for example this line:

            val res: String = cluster.requestResponse(self, message).awaitSingle().data()

How it's going to work? Cluster interface doesn't have method requestResponse.

artem-v avatar Dec 23 '23 11:12 artem-v

Actually that's true. The interface does not contain the method requestResponse (in master branch). But if you checkout the tag 2.6.17 you will find it, have a look here.

I think it's a discontinued feature, so I guess you can also close this issue.

Thanks a lot.

smyrgeorge avatar Dec 24 '23 19:12 smyrgeorge