amqpy icon indicating copy to clipboard operation
amqpy copied to clipboard

drain_events and publish channel using single connection in different threads

Open khomyakov42 opened this issue 8 years ago • 4 comments

When I try drain_events in first thread and publish message in obtained channel in second thread I never get the channel in second thread. Possible dead lock or i will should use different connection for publish and drain_events?

khomyakov42 avatar Aug 09 '16 15:08 khomyakov42

That sounds like a deadlock. It's best to use a different connection to publish, but I don't think there's anything wrong with publishing in the same connection in principle. If it's deadlocking, it's a bug. I'll look into it, thanks for the report.

veegee avatar Aug 09 '16 15:08 veegee

Example

import amqpy
import time
import traceback
import os
import threading
from multiprocessing.pool import ThreadPool

NUMBER_MESSAGE = 10000
NUMBER_EXCHANGE = 10

EXCHANGE_NAMES = ['exchange-%s' % i for i in range(NUMBER_EXCHANGE)]



class Consumer(threading.Thread):
    def __init__(self, conn, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.messages = []
        self.conn = conn

    def on_message(self, msg):
        self.messages.append(msg.body)
        print('[%d]Receive %s from %s' % (len(self.messages), msg.body, msg.delivery_info['exchange']))
        msg.ack()

    def declare(self):
        for exchange_name in EXCHANGE_NAMES:
            queue_name = 'queue-to-%s' % exchange_name
            self.conn.channel().queue_declare(queue_name)
            self.conn.channel().queue_bind(queue_name, exchange_name)
            self.conn.channel().basic_consume(queue_name, callback=self.on_message)

    def run(self):
        while len(self.messages) != NUMBER_MESSAGE:
            try:
                self.conn.drain_events(timeout=2)
            except amqpy.Timeout:
                pass
            except Exception as e:
                traceback.print_exc()
                os._exit(666)


class Producer(threading.Thread):
    def __init__(self, conn, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.messages = []
        self.conn = conn

    def run(self):
        messages = []
        conn = self.conn
        try:
            with ThreadPool(processes=len(EXCHANGE_NAMES)) as pool:
                free_exchange_names = set(EXCHANGE_NAMES)
                thread_exchange_name = {}

                def sender(i):
                    indent = threading.get_ident()
                    if indent not in thread_exchange_name:
                        thread_exchange_name[indent] = free_exchange_names.pop()
                    exchange_name = thread_exchange_name[indent]

                    channel = conn.channel()
                    body = str(i)
                    message = amqpy.Message(body, delivery_mode=2)
                    channel.basic_publish(message, exchange_name)
                    messages.append(body)
                    print('[%d]Send %s to %s' % (len(messages), body, exchange_name))
                pool.map(sender, range(NUMBER_MESSAGE))
            self.messages = messages
        except Exception as e:
            traceback.print_exc()
            os._exit(666)


def declare_exchanges():
    c = amqpy.Connection()
    for exchange_name in EXCHANGE_NAMES:
        c.channel().exchange_declare(exchange_name, 'fanout')

CASE = 3


if CASE == 1:  # deadlock or timeout
    declare_exchanges()

    conn = amqpy.Connection()
    c = Consumer(conn=conn)
    c.declare()
    c.start()

    p = Producer(conn=conn)
    p.start()

    p.join()
    c.join()

    assert len(c.messages) == len(p.messages)
    assert set(c.messages) == set(p.messages)
elif CASE == 2:  # timeout
    declare_exchanges()

    conn = amqpy.Connection()

    p = Producer(conn=conn)
    p.start()

    c = Consumer(conn=conn)
    c.declare()
    c.start()

    p.join()
    c.join()
elif CASE == 3:  # success
    declare_exchanges()

    c = Consumer(conn=amqpy.Connection())
    c.declare()
    c.start()

    p = Producer(conn=amqpy.Connection())
    p.start()

    p.join()
    c.join()

    assert len(c.messages) == len(p.messages)
    assert set(c.messages) == set(p.messages)

khomyakov42 avatar Aug 10 '16 04:08 khomyakov42

conn.drain_events() also makes it impossible to create new consumers later - on demand.

How to reproduce:

import gevent.monkey
gevent.monkey.patch_all()

import amqpy, gevent, logging, time

# Init:

conn = amqpy.Connection()
ch = conn.channel()
ch.exchange_declare('test.exchange', 'direct')
ch.queue_declare('test.q')
ch.queue_bind('test.q', exchange='test.exchange', routing_key='test.q')
ch.close()

def consume():
    ch = conn.channel()
    ch.basic_consume(queue='test.q', callback=on_msg)
    while True:
        time.sleep(10)

def on_msg(msg):
    print(msg.body)

# Start logging "amqpy":

logger = logging.getLogger('amqpy')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

# Start "drain_events" loop in a background thread:

def drain_events():
    while True:
        conn.drain_events(timeout=None)

gevent.spawn(drain_events)

# Main runtime:

time.sleep(1)  # Emulate that we need a new consumer on some event in runtime.
gevent.spawn(consume)

# Block main thread:

while True:
    time.sleep(10)

There is no basic.consume in the log:

Write:  channel: 1 method_t(class_id=20, method_id=10) channel.open
Read:   channel: 1 method_t(class_id=20, method_id=11) channel.open-ok
Channel open

But if we comment out gevent.spawn(drain_events), then basic.consume is sent and we get basic.consume-ok as expected:

Write:  channel: 1 method_t(class_id=20, method_id=10) channel.open
Read:   channel: 1 method_t(class_id=20, method_id=11) channel.open-ok
Channel open
Write:  channel: 1 method_t(class_id=60, method_id=20) basic.consume
Read:   channel: 1 method_t(class_id=60, method_id=21) basic.consume-ok

So we need a separate connection for each consumer created on demand,
but this is what channels are for - complete isolation inside the same connection.

@veegee, please try to create one more drain_events() in a Channel.

It could also fix the original issue with publishing and consuming using the same connection.

denis-ryzhkov avatar Aug 16 '16 13:08 denis-ryzhkov

I was in the middle of reworking the underlying concurrency mechanism. Python is not a very friendly language for this kind of concurrency. Supporting gevent properly is tricky as well.

One option is I can try to remove all locking except the very lowest level frame locks and shift the burden of synchronization to the user, which means the user will be responsible for locking the connection when doing concurrent writes from multiple threads.

Another option is to do away with locks and go for a go-style "channel" mechanism. I think this will be a good option and I'll be working on this shortly.

I would really like to support the Python 3 async/await stuff, especially now that pypy3 has gotten funding from Mozilla.

veegee avatar Sep 02 '16 13:09 veegee