amqpy
amqpy copied to clipboard
drain_events and publish channel using single connection in different threads
When I try drain_events in first thread and publish message in obtained channel in second thread I never get the channel in second thread. Possible dead lock or i will should use different connection for publish and drain_events?
That sounds like a deadlock. It's best to use a different connection to publish, but I don't think there's anything wrong with publishing in the same connection in principle. If it's deadlocking, it's a bug. I'll look into it, thanks for the report.
Example
import amqpy
import time
import traceback
import os
import threading
from multiprocessing.pool import ThreadPool
NUMBER_MESSAGE = 10000
NUMBER_EXCHANGE = 10
EXCHANGE_NAMES = ['exchange-%s' % i for i in range(NUMBER_EXCHANGE)]
class Consumer(threading.Thread):
def __init__(self, conn, *args, **kwargs):
super().__init__(*args, **kwargs)
self.messages = []
self.conn = conn
def on_message(self, msg):
self.messages.append(msg.body)
print('[%d]Receive %s from %s' % (len(self.messages), msg.body, msg.delivery_info['exchange']))
msg.ack()
def declare(self):
for exchange_name in EXCHANGE_NAMES:
queue_name = 'queue-to-%s' % exchange_name
self.conn.channel().queue_declare(queue_name)
self.conn.channel().queue_bind(queue_name, exchange_name)
self.conn.channel().basic_consume(queue_name, callback=self.on_message)
def run(self):
while len(self.messages) != NUMBER_MESSAGE:
try:
self.conn.drain_events(timeout=2)
except amqpy.Timeout:
pass
except Exception as e:
traceback.print_exc()
os._exit(666)
class Producer(threading.Thread):
def __init__(self, conn, *args, **kwargs):
super().__init__(*args, **kwargs)
self.messages = []
self.conn = conn
def run(self):
messages = []
conn = self.conn
try:
with ThreadPool(processes=len(EXCHANGE_NAMES)) as pool:
free_exchange_names = set(EXCHANGE_NAMES)
thread_exchange_name = {}
def sender(i):
indent = threading.get_ident()
if indent not in thread_exchange_name:
thread_exchange_name[indent] = free_exchange_names.pop()
exchange_name = thread_exchange_name[indent]
channel = conn.channel()
body = str(i)
message = amqpy.Message(body, delivery_mode=2)
channel.basic_publish(message, exchange_name)
messages.append(body)
print('[%d]Send %s to %s' % (len(messages), body, exchange_name))
pool.map(sender, range(NUMBER_MESSAGE))
self.messages = messages
except Exception as e:
traceback.print_exc()
os._exit(666)
def declare_exchanges():
c = amqpy.Connection()
for exchange_name in EXCHANGE_NAMES:
c.channel().exchange_declare(exchange_name, 'fanout')
CASE = 3
if CASE == 1: # deadlock or timeout
declare_exchanges()
conn = amqpy.Connection()
c = Consumer(conn=conn)
c.declare()
c.start()
p = Producer(conn=conn)
p.start()
p.join()
c.join()
assert len(c.messages) == len(p.messages)
assert set(c.messages) == set(p.messages)
elif CASE == 2: # timeout
declare_exchanges()
conn = amqpy.Connection()
p = Producer(conn=conn)
p.start()
c = Consumer(conn=conn)
c.declare()
c.start()
p.join()
c.join()
elif CASE == 3: # success
declare_exchanges()
c = Consumer(conn=amqpy.Connection())
c.declare()
c.start()
p = Producer(conn=amqpy.Connection())
p.start()
p.join()
c.join()
assert len(c.messages) == len(p.messages)
assert set(c.messages) == set(p.messages)
conn.drain_events()
also makes it impossible to create new consumers later - on demand.
How to reproduce:
import gevent.monkey
gevent.monkey.patch_all()
import amqpy, gevent, logging, time
# Init:
conn = amqpy.Connection()
ch = conn.channel()
ch.exchange_declare('test.exchange', 'direct')
ch.queue_declare('test.q')
ch.queue_bind('test.q', exchange='test.exchange', routing_key='test.q')
ch.close()
def consume():
ch = conn.channel()
ch.basic_consume(queue='test.q', callback=on_msg)
while True:
time.sleep(10)
def on_msg(msg):
print(msg.body)
# Start logging "amqpy":
logger = logging.getLogger('amqpy')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
# Start "drain_events" loop in a background thread:
def drain_events():
while True:
conn.drain_events(timeout=None)
gevent.spawn(drain_events)
# Main runtime:
time.sleep(1) # Emulate that we need a new consumer on some event in runtime.
gevent.spawn(consume)
# Block main thread:
while True:
time.sleep(10)
There is no basic.consume
in the log:
Write: channel: 1 method_t(class_id=20, method_id=10) channel.open
Read: channel: 1 method_t(class_id=20, method_id=11) channel.open-ok
Channel open
But if we comment out gevent.spawn(drain_events)
,
then basic.consume
is sent and we get basic.consume-ok
as expected:
Write: channel: 1 method_t(class_id=20, method_id=10) channel.open
Read: channel: 1 method_t(class_id=20, method_id=11) channel.open-ok
Channel open
Write: channel: 1 method_t(class_id=60, method_id=20) basic.consume
Read: channel: 1 method_t(class_id=60, method_id=21) basic.consume-ok
So we need a separate connection for each consumer created on demand,
but this is what channels are for - complete isolation inside the same connection.
@veegee, please try to create one more drain_events()
in a Channel
.
It could also fix the original issue with publishing and consuming using the same connection.
I was in the middle of reworking the underlying concurrency mechanism. Python is not a very friendly language for this kind of concurrency. Supporting gevent properly is tricky as well.
One option is I can try to remove all locking except the very lowest level frame locks and shift the burden of synchronization to the user, which means the user will be responsible for locking the connection when doing concurrent writes from multiple threads.
Another option is to do away with locks and go for a go-style "channel" mechanism. I think this will be a good option and I'll be working on this shortly.
I would really like to support the Python 3 async/await stuff, especially now that pypy3 has gotten funding from Mozilla.