akka-rabbitmq
akka-rabbitmq copied to clipboard
PublishSubcribe example doesn't work
Hi, I am trying to test the following example but it doesn't work.
object PublishSubscribe extends App {
implicit val system = ActorSystem()
val factory = new ConnectionFactory()
val connection = system.actorOf(ConnectionActor.props(factory), "akka-rabbitmq")
val exchange = "amq.fanout"
def setupPublisher(channel: Channel, self: ActorRef) {
val queue = channel.queueDeclare().getQueue
println(channel)
channel.queueBind(queue, exchange, "")
}
connection ! CreateChannel(ChannelActor.props(setup), Some("publisher"))
def setupSubscriber(channel: Channel, self: ActorRef) {
val queue = channel.queueDeclare().getQueue
println(channel)
channel.queueBind(queue, exchange, "")
val consumer = new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) {
println("received: " + fromBytes(body))
}
}
channel.basicConsume(queue, true, consumer)
}
def setup(channel: Channel, self: ActorRef): Unit = {
setupPublisher(channel, self)
setupSubscriber(channel, self)
}
connection ! CreateChannel(ChannelActor.props(setup), Some("subscriber"))
Future {
def loop(n: Long) {
val publisher = system.actorSelection("/user/rabbitmq/publisher")
def publish(channel: Channel) {
println("publish")
channel.basicPublish(exchange, "", null, toBytes(n))
}
publisher ! ChannelMessage(publish, dropIfNoChannel = false)
Thread.sleep(1000)
loop(n + 1)
}
loop(0)
}
def fromBytes(x: Array[Byte]) = new String(x, "UTF-8")
def toBytes(x: Long) = x.toString.getBytes("UTF-8")
}
The publish method is not called by
publisher ! ChannelMessage(publish, dropIfNoChannel = false)
Not working for me neither, ChannelMessages get sent to dead letter
Try replacing
val publisher = system.actorSelection("/user/rabbitmq/publisher")
with
val publisher = system.actorSelection("/user/akka-rabbitmq/publisher")
That fixed it for me
Indeed the connection Actor is created with the name "akka-rabbitmq" at the top but referenced as 'rabbitmq' below.
i've update the example. thx for reporting and providing the fix ;)