amqp
amqp copied to clipboard
Consumers with a consumer tag of "" cannot be canceled.
queue := "my-queue"
consumer := "" // bad
channel.Consume(queue, consumer, .....)
When the consumer is empty, later invocations to cancel using the empty string don't cause the consumer to be cancelled. It doesn't matter whether the noWait
argument for Cancel
is true
or false
.
I suspect the problem is found in the dictionary lookup ch, found := me.chans[tag]
inside of the consumers.go
file in the close
function.
The workaround for the time being is to have a consumer name that's populated.
When the consumer is empty, it gets autogenerated name
https://github.com/streadway/amqp/blob/master/channel.go#L1025
Correct, the consumer is random. But how does the application become aware of the consumer tag? It needs the tag to later cancel the subscription.
@joliver you should provide a tag if you care about cancelling consumer later
RabbitMQ will return consumer tag if it had to generate one (client provided consumer tag as an empty string). So you can still cancel those consumers (how exactly the value is returned varies slightly from client to client) ;)
@joliver in Java and .NET clients the tag is returned from Channel#basicConsume
(or equivalent). In some other clients where consumers are functions, it is attached to an object/data structured by the client once basic.consume-ok
arrives.
The trouble is that in this library, channel.Consume
does not return the consumer tag. The consumer tag can be determined from the incoming messages, but this means that a consumer with a generated tag cannot be canceled before handling at least one message.
More specifically, channel.Consume
returns a channel (that is, a Go channel) to receive the incoming deliveries, and an error.
https://godoc.org/github.com/streadway/amqp#Channel.Consume
Any progress here? Could be channel.NotifyConsumeOk()
handler to get this auto-generated tag from basic.consume-ok
@streadway so this is a very unfortunate API limitation. We could return a pair of (channel, consumer tag) but this is a breaking change and this client does not use branches or releases.
We could introduce another function for consuming but BasicConsumerWithGeneratedTag
but just about everyone is going to try the standard BasicConsume
first and get confused. We can also make this client generate a tag but RabbitMQ and the protocol already support this.
I think it's time to consider using branches and tags or some breaking API issues would never be addressed.
Bunny has two methods for basic.consume
because one accepts a consumer instance. We could introduce something similar here, e.g. Channel#ConsumeWith
, that would associate a consumer tag, a Go channel and perhaps a few functions e.g. for cancellation. Thoughts?
If you want to cancel a consumer, pass a consumer tag to Consume. Whether you save the client generated or server generated tag, it doesn't matter.
If anything a tag generator could be added to this package that basically does func GenerateTag() string { return strconv.Itoa(tag++)
}
Actually, I think key complaint here can be about the way you access consumer tags. Introducing a generator can be done together with one or more compatible ways to address that.
I haven't noticed one curious expectation in this issue.
basic.cancel
only supports specific tags, not empty strings, in the protocol. Only in basic.consume
an empty string has special meaning.
Even though there is no precedence for this approach in other RMQ clients, we could use the delivery channel returned by ch.Consume()
to cancel consumers with no tag:
func (ch *Channel) CancelConsumerWithoutTag(deliveries chan Delivery, noWait bool) error {
consumerTag = ch.consumers.findTag(deliveries) // not implemented
err = Cancel(consumerTag, noWait)
return err
}
Thoughts?
I'd personally introduce a Consumer
abstraction that's more than a Go channel (a struct with a consumer tag and a delivery channel may be sufficient) and a new function that accepts and returns a Consumer
reference (à la Bunny which has two ways to define a consumer: as as object and as a function).
@gerhard's function can be introduced as a workaround for those who cannot use this new function but I think such helpers would immediately stand out as API kludges to the user.
This library doesn't use server generated tags.
Even though it's clunky, I think the API is pretty clear - if you pass an empty string for the consumer tag, you don't plan on cancelling the consumer. Otherwise you will use that same string for either demultiplexing messages from multiple consumers, or cancel consumers. We could expose the UniqueConsumerTag() string
function to allocate those outside of the library.
In hindsight, returning a (Consumer, error)
would have been the right thing to do. However, adding a new function to consume would just add confusion and the API doesn't currently prevent consumers from being cancelled, only for those that pass an empty string for consumers.
@streadway sorry but it's possible to pass an empty string and get back a tag to later cancel a consumer in other clients (most of them, I'd say). So it's not an unreasonable scenario for someone coming from another client.
Bunny has had two (in fact, more like 3) ways of adding a consumer for years. I don't think there was much confusion because the difference is very clear (you pass in a function vs. an object) and explicitly laid out in the docs.
@streadway, @gerhard and I agreed to expose a function that generates unique consumer tags and update the docs. We could make a breaking API change that would return a consumer tag or a struct that combines a consumer tag with a deliveries channel but that sounds like more of an annoyance to the users of this library than it's worth.
@michaelklishin, I'm able to lookup the consumerTag via http, for example, "http://localhost:15672/api/queues/vhost/queueName". Still not able to cancel a consumer by calling ChannelN.basicCancel(consumerTag). It throws IOException("Unknown consumerTag").
/** Public API - {@inheritDoc} */
@Override
public void basicCancel(final String consumerTag)
throws IOException
{
final Consumer originalConsumer = _consumers.get(consumerTag);
if (originalConsumer == null)
throw new IOException("Unknown consumerTag");
.......
}
@harry-me how does that question have anything to do with this issue or client?
We already explained it on rabbitmq-users but let's take another round: consumers can only be cancelled on the same channel they were added on. Consumer tag => consumer map is a part of channel state.