dramatiq icon indicating copy to clipboard operation
dramatiq copied to clipboard

RabbitmqBroker open fd leak with gunicorn and gevent

Open gdvalle opened this issue 3 years ago • 6 comments

Hey, Bogdan!

This is not exactly a dramatiq issue in and of itself, but I think it's a common enough setup to warrant an issue and discussion here.

When running gunicorn on Linux with gevent workers I noticed on every request that enqueues a message, a new connection is created, eventually leading to file descriptor exhaustion that culminates in an OSError: [Errno 98] Address already in use as pika attempts to bind on port 0, or just a plain OSError: [Errno 24] Too many open files.

Each gunicorn request appears to have a new/empty threading.local() storage, so each time Broker.connection attempts to re-use the thread-local connection state it falls into making a new one. Each new connection is also stored in a non-thread-local set(), self.connections, which causes the process to hang onto the now unusable connection, and the GC is unable to tear those down to free up the ports. self.connections just keeps growing until file descriptors run out.

This behavior is different from the gthread worker, which seems to reuse threads for requests and preserve that thread-local storage. And dramatiq itself, by subclassing Thread and spawning workers, also doesn't suffer.

Here's an example to reproduce the issue. Save as main.py:

import dramatiq
import os
import urllib.request


def count_open_fds():
    return len(os.listdir("/proc/self/fd"))


@dramatiq.actor
def ping():
    ping.logger.info("%d", count_open_fds())


def app(environ, start_response):
    ping.send()
    print(count_open_fds())
    data = b""
    start_response(
        "200 OK",
        [
            ("Content-Type", "text/plain"),
            ("Content-Length", str(len(data))),
        ],
    )
    return iter([data])


if __name__ == "__main__":
    while True:
        urllib.request.urlopen("http://localhost:8000")

Install the dependencies:

pip install 'gunicorn[gevent]'
pip install -e '../dramatiq[rabbitmq]'

Start RabbitMQ somewhere.

Run the app:

gunicorn --worker-class=gevent --worker-connections=2 --workers=1 main:app

Run the worker:

dramatiq-gevent main

Start making requests:

python main.py

For a fix, I swapped out the thread-local connection storage for a dict that stores connections by Greenlet.minimal_ident. I think this should retain thread-safety, and it allows gunicorn actually reuse those connections.

import threading

import pika

from dramatiq.brokers.rabbitmq import RabbitmqBroker
from gevent import getcurrent
from prometheus_client import Counter, Gauge

DRAMATIQ_CONNECTIONS_CREATED = Counter(
    "dramatiq_rabbitmq_gevent_connections_created_total",
    "Count of connections created.",
)

DRAMATIQ_CONNECTION_COUNT = Gauge(
    "dramatiq_rabbitmq_gevent_current_connection_count",
    "Count of connections currently stored.",
)

DRAMATIQ_CHANNELS_CREATED = Counter(
    "dramatiq_rabbitmq_gevent_channels_created_total",
    "Count of channels created.",
)

DRAMATIQ_CHANNEL_COUNT = Gauge(
    "dramatiq_rabbitmq_gevent_current_channel_count",
    "Count of channels currently stored.",
)


class RabbitmqGeventBroker(RabbitmqBroker):
    """A RabbitmqBroker that handles connection thread-safety using
    a connection per greenlet ID.
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.connections_by_thread = {}
        self.channels_by_thread = {}

    def _get_thread_id(self):
        current_thread = getcurrent()
        greenlet_ident = getattr(current_thread, "minimal_ident", None)
        if greenlet_ident is None:
            # This is mainly for support while running w/o gevent, like
            # in a repl.
            return threading.current_thread().ident
        return greenlet_ident

    @property
    def connection(self):
        thread_id = self._get_thread_id()
        connection = self.connections_by_thread.get(thread_id)
        if connection is None:
            connection = self.connections_by_thread[
                thread_id
            ] = pika.BlockingConnection(parameters=self.parameters)
            self.connections.add(connection)
            DRAMATIQ_CONNECTIONS_CREATED.inc()
            DRAMATIQ_CONNECTION_COUNT.set(len(self.connections))
        return connection

    @connection.deleter
    def connection(self):
        del self.channel
        thread_id = self._get_thread_id()
        try:
            connection = self.connections_by_thread[thread_id]
            del self.connections_by_thread[thread_id]
            self.connections.remove(connection)
            DRAMATIQ_CONNECTION_COUNT.set(len(self.connections))
        except KeyError:
            return

        if connection.is_open:
            try:
                connection.close()
            except Exception:
                self.logger.exception("Encountered exception while closing Connection.")

    @property
    def channel(self):
        thread_id = self._get_thread_id()
        channel = self.channels_by_thread.get(thread_id, None)
        if channel is None:
            channel = self.channels_by_thread[thread_id] = self.connection.channel()
            if self.confirm_delivery:
                channel.confirm_delivery()
            self.channels.add(channel)
            DRAMATIQ_CHANNELS_CREATED.inc()
            DRAMATIQ_CHANNEL_COUNT.set(len(self.channels))
        return channel

    @channel.deleter
    def channel(self):
        thread_id = self._get_thread_id()
        try:
            channel = self.channels_by_thread[thread_id]
            del self.channels_by_thread[thread_id]
            self.channels.remove(channel)
        except KeyError:
            return
        finally:
            DRAMATIQ_CHANNEL_COUNT.set(len(self.channels))

        if channel.is_open:
            try:
                channel.close()
            except Exception:
                self.logger.exception("Encountered exception while closing Channel.")

This probably isn't perfect. It's not including the gevent hub ident in the key, so unsafe access could happen there, and I assume an ideal solution would be agnostic for gevent and threaded runtimes. But, hopefully this is enough to shed some light on the topic.

Also, somewhat related, I noticed pika just merged a gevent adapter a few days ago: https://github.com/pika/pika/blob/master/pika/adapters/gevent_connection.py, but I haven't dug in to see what it'd take to make that work with either the BlockingConnection style API or designing a dramatiq broker around that.

gdvalle avatar Feb 07 '21 21:02 gdvalle

Hey Greg,

I haven't had time this week to think about this very deeply so I might have more ideas over the weekend, but have you tried adding a middleware to your app to explicitly close the connections after each request? For example:

def dramatiq_middleware(env, start_response):
  try:
    return app(env, start_response)
  finally:
    del get_broker().connection

Since you mention each request is served by a new greenlet, this shouldn't negatively impact performance. It'd be good to come up with a better solution for gevent clients overall, though. The lack of pooling, in particular, is not great.

Bogdanp avatar Feb 09 '21 10:02 Bogdanp

I tried adding this:

class Middleware:
    def __init__(self, app):
        self.app = app

    def __call__(self, environ, start_response):
        try:
            return self.app(environ, start_response)
        finally:
            del dramatiq.get_broker().connection


app_with_middleware = Middleware(app)

It seems to do what you'd expect, removing the connection from self.connections, but it still runs out of file handles. I'll have to dig in further to figure out why that is.

gdvalle avatar Feb 09 '21 21:02 gdvalle

On a deeper look, I think I gave you bad advice/the connection deleter in the RabbitmqBroker is wrong. For one, it never calls close on the deleted connections and, second, the channel for a thread can hold a strong reference to its connection so at the very least both would need to be deleted. Not that that would be enough, because there doesn't seem to be a way for them to automatically get closed (I see no __del__s on either Channel or BlockingConnection in pika).

Bogdanp avatar Feb 13 '21 11:02 Bogdanp

@gdvalle let me know if the issue still occurs for you on master. I think the middleware should now work properly.

Bogdanp avatar Feb 13 '21 12:02 Bogdanp

@Bogdanp Middleware now works on master!

gdvalle avatar Feb 13 '21 14:02 gdvalle

@Bogdanp I still have the issue do I need to use this middleware or just having 1.11 should be enough ?

wsantos avatar Jul 19 '21 14:07 wsantos

@wsantos yes, the middleware is necessary. The fix only made the middleware work.

Bogdanp avatar Sep 23 '22 07:09 Bogdanp