amqp icon indicating copy to clipboard operation
amqp copied to clipboard

Consumers with a consumer tag of "" cannot be canceled.

Open joliver opened this issue 9 years ago • 20 comments

queue := "my-queue"
consumer := "" // bad
channel.Consume(queue, consumer, .....)

When the consumer is empty, later invocations to cancel using the empty string don't cause the consumer to be cancelled. It doesn't matter whether the noWait argument for Cancel is true or false.

I suspect the problem is found in the dictionary lookup ch, found := me.chans[tag] inside of the consumers.go file in the close function.

The workaround for the time being is to have a consumer name that's populated.

joliver avatar Jun 05 '15 16:06 joliver

When the consumer is empty, it gets autogenerated name

https://github.com/streadway/amqp/blob/master/channel.go#L1025

kron4eg avatar Jul 22 '15 12:07 kron4eg

Correct, the consumer is random. But how does the application become aware of the consumer tag? It needs the tag to later cancel the subscription.

joliver avatar Jul 22 '15 14:07 joliver

@joliver you should provide a tag if you care about cancelling consumer later

kron4eg avatar Jul 22 '15 14:07 kron4eg

RabbitMQ will return consumer tag if it had to generate one (client provided consumer tag as an empty string). So you can still cancel those consumers (how exactly the value is returned varies slightly from client to client) ;)

michaelklishin avatar Jul 22 '15 15:07 michaelklishin

@joliver in Java and .NET clients the tag is returned from Channel#basicConsume (or equivalent). In some other clients where consumers are functions, it is attached to an object/data structured by the client once basic.consume-ok arrives.

michaelklishin avatar Jul 22 '15 15:07 michaelklishin

The trouble is that in this library, channel.Consume does not return the consumer tag. The consumer tag can be determined from the incoming messages, but this means that a consumer with a generated tag cannot be canceled before handling at least one message.

leifurhauks avatar Nov 17 '15 17:11 leifurhauks

More specifically, channel.Consume returns a channel (that is, a Go channel) to receive the incoming deliveries, and an error. https://godoc.org/github.com/streadway/amqp#Channel.Consume

leifurhauks avatar Nov 17 '15 17:11 leifurhauks

Any progress here? Could be channel.NotifyConsumeOk() handler to get this auto-generated tag from basic.consume-ok

valichek avatar Jan 03 '16 14:01 valichek

@streadway so this is a very unfortunate API limitation. We could return a pair of (channel, consumer tag) but this is a breaking change and this client does not use branches or releases.

We could introduce another function for consuming but BasicConsumerWithGeneratedTag but just about everyone is going to try the standard BasicConsume first and get confused. We can also make this client generate a tag but RabbitMQ and the protocol already support this.

I think it's time to consider using branches and tags or some breaking API issues would never be addressed.

michaelklishin avatar Jan 05 '17 10:01 michaelklishin

Bunny has two methods for basic.consume because one accepts a consumer instance. We could introduce something similar here, e.g. Channel#ConsumeWith, that would associate a consumer tag, a Go channel and perhaps a few functions e.g. for cancellation. Thoughts?

michaelklishin avatar Jan 05 '17 10:01 michaelklishin

If you want to cancel a consumer, pass a consumer tag to Consume. Whether you save the client generated or server generated tag, it doesn't matter.

If anything a tag generator could be added to this package that basically does func GenerateTag() string { return strconv.Itoa(tag++) }

streadway avatar Jan 05 '17 10:01 streadway

Actually, I think key complaint here can be about the way you access consumer tags. Introducing a generator can be done together with one or more compatible ways to address that.

michaelklishin avatar Jan 05 '17 11:01 michaelklishin

I haven't noticed one curious expectation in this issue.

basic.cancel only supports specific tags, not empty strings, in the protocol. Only in basic.consume an empty string has special meaning.

michaelklishin avatar Jan 05 '17 11:01 michaelklishin

Even though there is no precedence for this approach in other RMQ clients, we could use the delivery channel returned by ch.Consume() to cancel consumers with no tag:

func (ch *Channel) CancelConsumerWithoutTag(deliveries chan Delivery, noWait bool) error {
  consumerTag = ch.consumers.findTag(deliveries) // not implemented
  err = Cancel(consumerTag, noWait)
  return err
}

Thoughts?

gerhard avatar Jan 05 '17 12:01 gerhard

I'd personally introduce a Consumer abstraction that's more than a Go channel (a struct with a consumer tag and a delivery channel may be sufficient) and a new function that accepts and returns a Consumer reference (à la Bunny which has two ways to define a consumer: as as object and as a function).

@gerhard's function can be introduced as a workaround for those who cannot use this new function but I think such helpers would immediately stand out as API kludges to the user.

michaelklishin avatar Jan 05 '17 13:01 michaelklishin

This library doesn't use server generated tags.

Even though it's clunky, I think the API is pretty clear - if you pass an empty string for the consumer tag, you don't plan on cancelling the consumer. Otherwise you will use that same string for either demultiplexing messages from multiple consumers, or cancel consumers. We could expose the UniqueConsumerTag() string function to allocate those outside of the library.

In hindsight, returning a (Consumer, error) would have been the right thing to do. However, adding a new function to consume would just add confusion and the API doesn't currently prevent consumers from being cancelled, only for those that pass an empty string for consumers.

streadway avatar Jan 05 '17 15:01 streadway

@streadway sorry but it's possible to pass an empty string and get back a tag to later cancel a consumer in other clients (most of them, I'd say). So it's not an unreasonable scenario for someone coming from another client.

Bunny has had two (in fact, more like 3) ways of adding a consumer for years. I don't think there was much confusion because the difference is very clear (you pass in a function vs. an object) and explicitly laid out in the docs.

michaelklishin avatar Jan 05 '17 15:01 michaelklishin

@streadway, @gerhard and I agreed to expose a function that generates unique consumer tags and update the docs. We could make a breaking API change that would return a consumer tag or a struct that combines a consumer tag with a deliveries channel but that sounds like more of an annoyance to the users of this library than it's worth.

michaelklishin avatar Jan 06 '17 14:01 michaelklishin

@michaelklishin, I'm able to lookup the consumerTag via http, for example, "http://localhost:15672/api/queues/vhost/queueName". Still not able to cancel a consumer by calling ChannelN.basicCancel(consumerTag). It throws IOException("Unknown consumerTag").

/** Public API - {@inheritDoc} */
@Override
public void basicCancel(final String consumerTag)
    throws IOException
{
    final Consumer originalConsumer = _consumers.get(consumerTag);
    if (originalConsumer == null)
        throw new IOException("Unknown consumerTag");
  .......

}

harry-me avatar Mar 12 '18 22:03 harry-me

@harry-me how does that question have anything to do with this issue or client?

We already explained it on rabbitmq-users but let's take another round: consumers can only be cancelled on the same channel they were added on. Consumer tag => consumer map is a part of channel state.

michaelklishin avatar Mar 14 '18 02:03 michaelklishin