amqp-client
amqp-client copied to clipboard
AddConfirmListener or AddReturnListener not working as expected on 1.3
I have the following setup.
val testProbe = TestProbe()
val config = PublishConfiguration("WRONG","NOPE",false,false)
producer ! AddConfirmListener(testProbe.ref)
producer ! AddReturnListener(testProbe.ref)
producter ! Publish(....)
testProbe.receiveN(1)
I then publish to a queue that does not exist. I assumed based off issue #24 that these listeners would capture the Error message. This does not appear to be the case as the test fails with a timeout.
Maybe I've missed something, but I cannot reproduce this problem, and there is a test for return listeners (see https://github.com/sstone/amqp-client/blob/5e603d1d5419e773c26d89058a1f368e8ae91e86/src/test/scala/com/github.sstone/amqp/ChannelOwnerSpec.scala#L61) If you compile the client, do the tests pass ? what does PublishConfiguration("WRONG","NOPE",false,false) mean ?
Thanks
In my case I am passing the channel to another actor that manages creating the Publish messages and does some other boilerplate tasks. If I publish the message directly to the producer channel then my testprobe receives the callback message. If I publish using my actor then the listening actor doesn't receive the message.
// Trap the messages emitted from creating : exchange,queue and binding
receiveN(3)
// A wrapper class for configuring the PublishActor. This just has wrong information in it.
val config = PublishConfiguration("","no_such_queue",false,false)
// Domain model.
val command = MarkAsDisabled("1")
val testProbe = TestProbe()
producer ! AddReturnListener(testProbe.ref)
val Amqp.Ok(_, None) = receiveOne(1 seconds)
val publishingActor = system.actorOf(Props(classOf[PublishActor],producer,config,publishTimeout))
publishingActor ! command
// This works
//producer ! Publish("", "no_such_queue", "test".getBytes)
testProbe.receiveN(1)
I replicated your tests and if your exchange name is not equal to "" then the test fails. Why would the exchange need to be the empty string?
In the case when the exchange is not equal to "" say "no_such_exchange" then I see the following log and stack output
[ERROR] [11/12/2014 20:12:52.299] [events-akka.actor.default-dispatcher-3] [akka://events/user/$a/$a] shutdown com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 's' in vhost '/', class-id=60, method-id=40) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550) at java.lang.Thread.run(Thread.java:745)
In general, if I want to be able to handle any failure to send a message ranging from incorrectly configured Publish messages or network/connection failures then what are the messages that I need to listen for?
Ok, I get it. It seems that there are different behaviours for different error classes: some errors will crash the channel that was used, others will leave the channel open but result in an exception that is returned wrapped in an error message, and some errors are handled in a specific way (like returned messages). For example:
- publishing an unroutable to an existing exchange will not crash the channel, and the return listener will receive a "returned message"
- publishing to an exchange that does not exist will crash the channel, and the shutdown listener will receive a shutdown message
=> You might try registering a Shutdown listener as well, and see if that matches your requirements. Unfortunately, I won't have much time to look at this before the weekend, but I'll try and give you an update before Monday.
Thanks
Sounds good. I'm essentially using a pool of my domain specific producer actors that are using the Cameo pattern so I'll issue the request from inside my actor and then wait for the variety of messages. I just need to know what the messages could be for each specific listener so I can handle them.