dramatiq
dramatiq copied to clipboard
RabbitmqBroker open fd leak with gunicorn and gevent
Hey, Bogdan!
This is not exactly a dramatiq issue in and of itself, but I think it's a common enough setup to warrant an issue and discussion here.
When running gunicorn on Linux with gevent workers I noticed on every request that enqueues a message, a new connection is created, eventually leading to file descriptor exhaustion that culminates in an OSError: [Errno 98] Address already in use
as pika attempts to bind on port 0, or just a plain OSError: [Errno 24] Too many open files
.
Each gunicorn request appears to have a new/empty threading.local()
storage, so each time Broker.connection
attempts to re-use the thread-local connection state it falls into making a new one. Each new connection is also stored in a non-thread-local set()
, self.connections
, which causes the process to hang onto the now unusable connection, and the GC is unable to tear those down to free up the ports. self.connections
just keeps growing until file descriptors run out.
This behavior is different from the gthread
worker, which seems to reuse threads for requests and preserve that thread-local storage. And dramatiq itself, by subclassing Thread
and spawning workers, also doesn't suffer.
Here's an example to reproduce the issue. Save as main.py
:
import dramatiq
import os
import urllib.request
def count_open_fds():
return len(os.listdir("/proc/self/fd"))
@dramatiq.actor
def ping():
ping.logger.info("%d", count_open_fds())
def app(environ, start_response):
ping.send()
print(count_open_fds())
data = b""
start_response(
"200 OK",
[
("Content-Type", "text/plain"),
("Content-Length", str(len(data))),
],
)
return iter([data])
if __name__ == "__main__":
while True:
urllib.request.urlopen("http://localhost:8000")
Install the dependencies:
pip install 'gunicorn[gevent]'
pip install -e '../dramatiq[rabbitmq]'
Start RabbitMQ somewhere.
Run the app:
gunicorn --worker-class=gevent --worker-connections=2 --workers=1 main:app
Run the worker:
dramatiq-gevent main
Start making requests:
python main.py
For a fix, I swapped out the thread-local connection storage for a dict that stores connections by Greenlet.minimal_ident. I think this should retain thread-safety, and it allows gunicorn actually reuse those connections.
import threading
import pika
from dramatiq.brokers.rabbitmq import RabbitmqBroker
from gevent import getcurrent
from prometheus_client import Counter, Gauge
DRAMATIQ_CONNECTIONS_CREATED = Counter(
"dramatiq_rabbitmq_gevent_connections_created_total",
"Count of connections created.",
)
DRAMATIQ_CONNECTION_COUNT = Gauge(
"dramatiq_rabbitmq_gevent_current_connection_count",
"Count of connections currently stored.",
)
DRAMATIQ_CHANNELS_CREATED = Counter(
"dramatiq_rabbitmq_gevent_channels_created_total",
"Count of channels created.",
)
DRAMATIQ_CHANNEL_COUNT = Gauge(
"dramatiq_rabbitmq_gevent_current_channel_count",
"Count of channels currently stored.",
)
class RabbitmqGeventBroker(RabbitmqBroker):
"""A RabbitmqBroker that handles connection thread-safety using
a connection per greenlet ID.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.connections_by_thread = {}
self.channels_by_thread = {}
def _get_thread_id(self):
current_thread = getcurrent()
greenlet_ident = getattr(current_thread, "minimal_ident", None)
if greenlet_ident is None:
# This is mainly for support while running w/o gevent, like
# in a repl.
return threading.current_thread().ident
return greenlet_ident
@property
def connection(self):
thread_id = self._get_thread_id()
connection = self.connections_by_thread.get(thread_id)
if connection is None:
connection = self.connections_by_thread[
thread_id
] = pika.BlockingConnection(parameters=self.parameters)
self.connections.add(connection)
DRAMATIQ_CONNECTIONS_CREATED.inc()
DRAMATIQ_CONNECTION_COUNT.set(len(self.connections))
return connection
@connection.deleter
def connection(self):
del self.channel
thread_id = self._get_thread_id()
try:
connection = self.connections_by_thread[thread_id]
del self.connections_by_thread[thread_id]
self.connections.remove(connection)
DRAMATIQ_CONNECTION_COUNT.set(len(self.connections))
except KeyError:
return
if connection.is_open:
try:
connection.close()
except Exception:
self.logger.exception("Encountered exception while closing Connection.")
@property
def channel(self):
thread_id = self._get_thread_id()
channel = self.channels_by_thread.get(thread_id, None)
if channel is None:
channel = self.channels_by_thread[thread_id] = self.connection.channel()
if self.confirm_delivery:
channel.confirm_delivery()
self.channels.add(channel)
DRAMATIQ_CHANNELS_CREATED.inc()
DRAMATIQ_CHANNEL_COUNT.set(len(self.channels))
return channel
@channel.deleter
def channel(self):
thread_id = self._get_thread_id()
try:
channel = self.channels_by_thread[thread_id]
del self.channels_by_thread[thread_id]
self.channels.remove(channel)
except KeyError:
return
finally:
DRAMATIQ_CHANNEL_COUNT.set(len(self.channels))
if channel.is_open:
try:
channel.close()
except Exception:
self.logger.exception("Encountered exception while closing Channel.")
This probably isn't perfect. It's not including the gevent hub ident in the key, so unsafe access could happen there, and I assume an ideal solution would be agnostic for gevent and threaded runtimes. But, hopefully this is enough to shed some light on the topic.
Also, somewhat related, I noticed pika just merged a gevent adapter a few days ago: https://github.com/pika/pika/blob/master/pika/adapters/gevent_connection.py, but I haven't dug in to see what it'd take to make that work with either the BlockingConnection style API or designing a dramatiq broker around that.
Hey Greg,
I haven't had time this week to think about this very deeply so I might have more ideas over the weekend, but have you tried adding a middleware to your app to explicitly close the connections after each request? For example:
def dramatiq_middleware(env, start_response):
try:
return app(env, start_response)
finally:
del get_broker().connection
Since you mention each request is served by a new greenlet, this shouldn't negatively impact performance. It'd be good to come up with a better solution for gevent clients overall, though. The lack of pooling, in particular, is not great.
I tried adding this:
class Middleware:
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
try:
return self.app(environ, start_response)
finally:
del dramatiq.get_broker().connection
app_with_middleware = Middleware(app)
It seems to do what you'd expect, removing the connection from self.connections
, but it still runs out of file handles. I'll have to dig in further to figure out why that is.
On a deeper look, I think I gave you bad advice/the connection deleter in the RabbitmqBroker
is wrong. For one, it never calls close
on the deleted connections and, second, the channel for a thread can hold a strong reference to its connection so at the very least both would need to be del
eted. Not that that would be enough, because there doesn't seem to be a way for them to automatically get closed (I see no __del__
s on either Channel
or BlockingConnection
in pika).
@gdvalle let me know if the issue still occurs for you on master. I think the middleware should now work properly.
@Bogdanp Middleware now works on master!
@Bogdanp I still have the issue do I need to use this middleware or just having 1.11 should be enough ?
@wsantos yes, the middleware is necessary. The fix only made the middleware work.