scalecube-cluster
scalecube-cluster copied to clipboard
Wrong response message when using "requestResponse" method (send message to self).
Hello!
I was just playing around and I think I encountered a bug.
The problem is when we make use of the requestResponse method in order to send messages to our self (acting like a node).
The code correctly waits for the response, although it does not return the right response message.
For details just take a look at the following example (kotlin).
println(res) does not print "Pong!"
fun main(args: Array<String>) {
val alias = System.getenv("ACTOR_NODE_ID") ?: "node-1"
val seedPort = System.getenv("ACTOR_NODE_SEED_PORT")?.toInt() ?: 61100
// Build cluster.
val cluster: Cluster = ClusterImpl().transport { it.port(seedPort) }
.config { it.memberAlias(alias) }
.transportFactory { TcpTransportFactory() }
.handler {
object : ClusterMessageHandler {
override fun onMessage(message: Message) {
println("Received message: $message")
if (message.correlationId() != null
&& message.header("x-is-reply") != null
) {
val response = Message
.builder()
.correlationId(message.correlationId())
.header("x-is-reply", "t")
.data("Pong!")
.build()
runBlocking { it.send(message.sender(), response).awaitFirstOrNull() }
}
}
override fun onGossip(gossip: Message) {
println("Received message: $gossip")
}
override fun onMembershipEvent(event: MembershipEvent) {
println("Received membership-event: $event")
}
}
}.startAwait()
runBlocking {
while (true) {
val self: Member = cluster.member()
val message: Message = Message.builder().correlationId("test_id").data("Ping!").build()
val res: String = cluster.requestResponse(self, message).awaitSingle().data()
println(res) // Prints "Ping!" and not "Pong!"
// Block main thread.
delay(1_000)
}
}
}
The above code can be found here.
Here is the output for the code above:
0 [main] INFO io.scalecube.cluster.Cluster - [null][doStart] Starting, config: ClusterConfig[metadata=null, metadataTimeout=3000, metadataCodec=io.scalecube.cluster.metadata.JdkMetadataCodec@7dc222ae, memberId='null', memberAlias='node-1', externalHosts=null, transportConfig=TransportConfig[port=61100, clientSecured=false, connectTimeout=3000, messageCodec=io.scalecube.cluster.transport.api.JdkMessageCodec@aecb35a, maxFrameLength=2097152, transportFactory=io.scalecube.transport.netty.tcp.TcpTransportFactory@5fcd892a, addressMapper=java.util.function.Function$$Lambda$34/0x0000000800c69e20@8b87145], failureDetectorConfig=FailureDetectorConfig[pingInterval=1000, pingTimeout=500, pingReqMembers=3], gossipConfig=GossipConfig[gossipFanout=3, gossipInterval=200, gossipRepeatMult=3, gossipSegmentationThreshold=1000], membershipConfig=MembershipConfig[seedMembers=[], syncInterval=30000, syncTimeout=3000, suspicionMult=5, namespace='default', removedMembersHistorySize=42]]
324 [sc-cluster-io-nio-1] INFO io.scalecube.cluster.transport.api.Transport - [start][/[0:0:0:0:0:0:0:0]:61100] Bound cluster transport
403 [sc-cluster-io-nio-1] INFO io.scalecube.cluster.Cluster - [default:node-1:7778863775a34011][doStart] Started
default:node-1:7778863775a34011
[default:node-1:7778863775a34011]
Received message: Message[headers={sender=192.168.2.10:61100, cid=test_id}, data=Ping!]
Ping!
Received message: Message[headers={sender=192.168.2.10:61100, cid=test_id}, data=Ping!]
Ping!
Am I missing something?
Thanks a lot!
@smyrgeorge Hi, thanks for reporting. We will look into this till FRI.
Just forgot to mention that I'm using the 2.7.0.rc version.
Honestly I don't remember what's 2.7.0.rc, looks like it was some development of something. I was trying to reproduce your program in java, on latest version, on 2.6.17, and confusion is following - where do you initially send message to the cluster? I see you declared message handler:
override fun onMessage(message: Message)
...
it's ok, but this handler must be activated upon receive of some message, question is - where do you send this message to cluster with non-null correlationId and header "x-is-reply"?
I just tried the version 2.6.17 and the problem remains.
I'm sending the message to the node that sent the Ping! request.
Can you rewrite example in java. It's hard to understand what' going on, for example this line:
val res: String = cluster.requestResponse(self, message).awaitSingle().data()
How it's going to work? Cluster interface doesn't have method requestResponse.
Actually that's true. The interface does not contain the method requestResponse (in master branch).
But if you checkout the tag 2.6.17 you will find it, have a look here.
I think it's a discontinued feature, so I guess you can also close this issue.
Thanks a lot.