django_dramatiq icon indicating copy to clipboard operation
django_dramatiq copied to clipboard

InterfaceError: connection already closed

Open drpancake opened this issue 4 years ago • 17 comments
trafficstars

I'm pushing 500+ tasks per minute through Dramatiq as part of a Django app and occasionally (once every 1-2 days) I suddenly get hundreds instances of this error from the workers and the only way to fix it is to restart the worker process.

Sentry reports that it's triggered in both before_process_message() and after_process_message() when they both call Task.tasks.create_or_update_from_message().

I have CONN_MAX_AGE set to 0 and database connections are pooled via PgBouncer.

Please let me know if I'm missing any other information.

drpancake avatar Jun 07 '21 03:06 drpancake

I found this old issue in Celery that is perhaps helpful: https://github.com/celery/django-celery/issues/121

Also this blog post: https://tryolabs.com/blog/2014/02/12/long-running-process-and-django-orm/

And one more lead: https://stackoverflow.com/a/37891165/3211027

drpancake avatar Jun 07 '21 03:06 drpancake

Is the DbConnectionsMiddleware correctly configured in your app? Make sure it has a higher priority than the admin middleware (i.e. comes before it in the middleware list).

Bogdanp avatar Jun 13 '21 05:06 Bogdanp

Is the DbConnectionsMiddleware correctly configured in your app? Make sure it has a higher priority than the admin middleware (i.e. comes before it in the middleware list).

@Bogdanp yes it's in the correct order, my MIDDLEWARE is set up as follows:

"dramatiq.middleware.AgeLimit",
"dramatiq.middleware.TimeLimit",
"dramatiq.middleware.Retries",
"django_dramatiq.middleware.DbConnectionsMiddleware",
"django_dramatiq.middleware.AdminMiddleware",
"ss.web.utils.dramatiq.SentryMiddleware",

I'm also now seeing this error around 50 times per hour: 'NoneType' object has no attribute 'decode'. It's raised from the decode() method in dramatiq.encoder.JSONEncoder.

I currently have CONN_MAX_AGE=0 set in Django. Would you recommend changing this setting?

drpancake avatar Jun 14 '21 05:06 drpancake

No, that setting should be fine (if a little inefficient). Re. the decoding error, are you using the Redis broker? If so, you should try upgrading to dramatiq v1.11.0 which has some related fixes and improvements.

Bogdanp avatar Jun 14 '21 05:06 Bogdanp

@Bogdanp Yep using Redis as the broker. I'll try that, thanks.

Any clues about the connection already closed error? I'm happy to dig into this and try to fix it myself, but I'm not exactly sure where to begin! I looked at the source and I can see that this library does handle old/broken DB connections, so I wonder if this is some obscure race condition that's only happening because I'm running a lot of tasks.

drpancake avatar Jun 14 '21 05:06 drpancake

I'm really not sure what it could be. I would probably start by making sure the clean up code in the connections middleware runs at the right time and there's nothing else that might be opening up connections after it and leaving them open.

Bogdanp avatar Jun 14 '21 06:06 Bogdanp

I don't actually use django_dramatiq but I just run into the same problem with plain Django + Dramatiq. Are you wrapping any DB calls in sync_to_async?

Here's a standalone Django management command to recreate it:

import asyncio
from time import sleep

from asgiref.sync import sync_to_async
from django.core.management.base import BaseCommand


class Command(BaseCommand):
    def handle(self, *args, **options):
        async def test():
            print('Getting')
            return await sync_to_async(AModel.objects.get)(id=1234)

        # Each loop of the while block is akin to a Dramatiq task under a long
        # running worker
        while True:
            task = asyncio.ensure_future(test())
            asyncio.get_event_loop().run_until_complete(asyncio.wait([task]))

            try:
                print(task.result())
            except Exception as e:
                print(e)

            sleep(5)

While the above is running kill the connection in Postgres with the following:

SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid != pg_backend_pid() AND query != '';

Then it should start spitting out "connection already closed" errors.

It seems like extra DB cleanup is needed for any async threads or it could be a Django bug?

kylemacfarlane avatar Aug 29 '21 11:08 kylemacfarlane

A fix seems to be to send db.close_old_connections() through sync_to_async() so that it finds and cleans up any connections in there.

import asyncio
from django import db
from time import sleep

from asgiref.sync import sync_to_async
from django.core.management.base import BaseCommand


class Command(BaseCommand):
    def handle(self, *args, **options):
        async def test():
            return await sync_to_async(AModel.objects.get)(id=1234)            

        # Each loop of the while block is akin to a Dramatiq task under a long
        # running worker
        while True:
            try:
                print('SYNC', AModel.objects.get(id=1234))
            except Exception as e:
                print(e)

            task = asyncio.ensure_future(test())
            asyncio.get_event_loop().run_until_complete(asyncio.wait([task]))

            try:
                print('ASYNC', task.result())
            except Exception as e:
                print(e)

            # Remove dead sync connections
            db.close_old_connections()
            # Remove dead async connections
            asyncio.get_event_loop().run_until_complete(
                sync_to_async(db.close_old_connections)()
            )

            sleep(5)

kylemacfarlane avatar Aug 29 '21 12:08 kylemacfarlane

Thanks for this.

I don't actually use django_dramatiq but I just run into the same problem with plain Django + Dramatiq. Are you wrapping any DB calls in sync_to_async?

No, I'm not using any Django async functionality in my project. I tried putting db.close_old_connections() in various places but it doesn't seem to help.

My current (horrible) solution is to restart my worker process every 15 minutes!

drpancake avatar Aug 30 '21 03:08 drpancake

It could also be a regular thread somewhere (or running dramatiq with --threads x and non thread safe code). From what I understand sync_to_async is just a wrapper around an asyncio ThreadExecutor.

kylemacfarlane avatar Aug 30 '21 16:08 kylemacfarlane

I'm having the same issue. The funny part is it's only on our staging system. Production seems fine. This was a lift and shift extraction of a microservice. I've been working on getting it into production. No async code.

We're using django_dramatiq dramatiq_sqs sentry-dramatiq django_periodiq

Settings

DATABASES = {"default": dj_database_url.parse(configuration.db.url, conn_max_age=500)}
.
.
.
DRAMATIQ_BROKER = {
    "BROKER": "dramatiq_sqs.SQSBroker",
    "OPTIONS": {
        "namespace": "pricing-service_tasks",
    },
    "MIDDLEWARE": [
        "dramatiq.middleware.TimeLimit",
        "dramatiq.middleware.Callbacks",
        "dramatiq.middleware.Retries",
        "django_dramatiq.middleware.DbConnectionsMiddleware",
        "periodiq.PeriodiqMiddleware",
    ],
}

Only task running

@dramatiq.actor(periodic=cron("*/1 * * * *"))
def example_task():
    """For testing Dramatiq setup in staging and prod. Will delete/revert once I'm done."""
    sleep(5)
    user, _ = User.objects.get_or_create(email="[email protected]", is_active=False)
    user.first_name = random.choice(range(1, 999999))
    user.save()

martyphee avatar Sep 06 '21 12:09 martyphee

@martyphee Is it easily reproducible or somewhat random? Do you have a high throughput of tasks?

I'm also using Sentry and I wonder if that could be related.

For extra context I'm averaging 20 tasks/second and it this issue usually appears after 24-48 hours. But it appears to be entirely random.

drpancake avatar Sep 07 '21 01:09 drpancake

@drpancake seems to consistently happen on our staging servers. The picture below are the task processors and each is now holding 64 connections. That will be the max they hold and we get into a situation where the connection pool, for the lack of a better description, freaks out and throws tons of errors saying connection is already closed. I think it might be hitting the max db connections. image

This is the only job running and it only runs once per minute as a test. There is nothing else hitting it.

It took about 24 hours to hit the 64/conn per pod. I restarted the pods yesterday morning.

Sentry error image

martyphee avatar Sep 07 '21 07:09 martyphee

Interesting! Let me know if you make any breakthroughs. Perhaps you could try running the task more frequently to see if you hit the error sooner.

As much as I detest it, I may have to move over to Celery at some point as it's at least battle-tested for a high throughput of tasks like my use case.

drpancake avatar Sep 08 '21 01:09 drpancake

As much as I detest it, I may have to move over to Celery at some point as it's at least battle-tested for a high throughput of tasks like my use case.

FWIW, I'm having this problem with Celery and there's an issue about this here: https://github.com/celery/django-celery/issues/121

Bummer to see dramatiq having this problem too. I was just thinking I could solve so many of precisely these kinds of issues by switching over.

mlissner avatar May 17 '22 00:05 mlissner

@mlissner That's disheartening to hear! I'm still using Dramatiq but I have a cron job that restarts the worker process every 15 minutes. Luckily for my use case tasks getting killed randomly is acceptable.

drpancake avatar May 17 '22 01:05 drpancake