kombu icon indicating copy to clipboard operation
kombu copied to clipboard

maybe_declare does not redeclare Queue bindings

Open rectalogic opened this issue 11 years ago • 0 comments

Using kombu master@82dc37b with librabbitmq.

kombu.common.maybe_declare caches Entity hashes in channel.connection.client.declared_entities and does not delcare the Entity if the hash exists. Queue.__hash__ hashes only the Queue name which does not account for any bindings. So multiple Queue instances with the same name but different routing_keys or bindings will not be declared - only the first binding will be declared.

I have a sample app that declares an Exchange and Queue when publishing. I publish 3 messages, each with a different routing_key and to the same Queue but with a different binding. The Queue and Exchange are in the declare list passed to publish. Messages will be lost if published to an exchange that does not yet have a queue bound to it, so we need to declare the binding when publishing.

Only the first message is published - the others are lost, and only the first queue binding is created in RabbitMQ.

This is a sample app that reproduces the issue:

import kombu
import kombu.pools

TEST_EXCHANGE = kombu.Exchange('test_queuebinding', type='direct', durable=True, delivery_mode=kombu.Exchange.PERSISTENT_DELIVERY_MODE)
TEST_QUEUEBINDING1 = kombu.Queue(name='test_queuebinding', exchange=TEST_EXCHANGE, routing_key='binding1', durable=True)
TEST_QUEUEBINDING2 = kombu.Queue(name='test_queuebinding', exchange=TEST_EXCHANGE, routing_key='binding2', durable=True)
TEST_QUEUEBINDING3 = kombu.Queue(name='test_queuebinding', exchange=TEST_EXCHANGE, routing_key='binding3', durable=True)


connection = kombu.Connection("amqp://localhost")
with kombu.pools.producers[connection].acquire(block=True) as producer:
    def publish(payload, routing_key, queue):
        producer.publish(
            payload,
            routing_key=routing_key,
            content_type='text/plain', content_encoding='ascii',
            exchange=TEST_EXCHANGE,
            declare=[TEST_EXCHANGE, queue],
        )
        print("published to binding {0}".format(routing_key))

    publish("MESSAGE-1", "binding1", TEST_QUEUEBINDING1)
    publish("MESSAGE-2", "binding2", TEST_QUEUEBINDING2)
    publish("MESSAGE-3", "binding3", TEST_QUEUEBINDING3)

This patch fixes the issue - all the bindings are created and all 3 messages published:

diff --git a/kombu/entity.py b/kombu/entity.py
index c27316c..17bf9fc 100644
--- a/kombu/entity.py
+++ b/kombu/entity.py
@@ -490,7 +490,7 @@ class Queue(MaybeChannelBound):
         return bound

     def __hash__(self):
-        return hash('Q|%s' % (self.name, ))
+        return hash('Q|%s|%s|%s' % (self.name, self.routing_key, '|'.join(str(hash(b)) for b in self.bindings)))

     def when_bound(self):
         if self.exchange:

That said, I don't think it is appropriate to rely on the hash being unique. common.maybe_declare stores a list of hashes in channel.connection.client.declared_entities and will not redeclare if an entity hash exists in that list - but there is no guarantee different entities will not hash to the same value.

I think Entities should implement something like an entity_key string property which they can construct with all values that make them unique, so for Queue it would include the name and bindings, then store that key in declared_entities instead of a hash.

rectalogic avatar Aug 28 '14 14:08 rectalogic