django_dramatiq icon indicating copy to clipboard operation
django_dramatiq copied to clipboard

Using dramatiq, apscheduler with RabbitMQresults in missing heartbeats on RabbitMQ

Open th0th opened this issue 6 years ago • 9 comments
trafficstars

I needed to add a scheduler to run some actors periodically and I added a django management command to run the scheduler, it goes like this:

import signal
import sys

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from django.core.management.base import BaseCommand


class Command(BaseCommand):
    help = 'Run task scheduler'

    def handle(self, *args, **options):
        scheduler = BlockingScheduler()
        
        scheduler.add(
            trigger=IntervalTrigger(
                seconds=30,
            ),
            name='task1',
            func='project.apps.app.tasks.task1.send',  # Edit:  added '.send' 
        )

        def shutdown(*args):
            self.stdout.write("Exiting...")
            sys.exit(0)

        signal.signal(signal.SIGINT, shutdown)
        signal.signal(signal.SIGTERM, shutdown)

        self.stdout.write("Discovered tasks:")

        for s in scheduler.get_jobs():
            self.stdout.write(f"* {s.name} - {s.trigger}")

        self.stdout.write("\nStarting scheduler...")

        scheduler.start()

        return 0

It works but connection resets from time to time, with this:

[2019-07-05 21:40:49,396] [ERROR] pika.adapters.utils.io_services_utils: _AsyncBaseTransport._produce() failed, aborting connection: error=ConnectionResetError(104, 'Connection reset by peer'); sock=<socket.socket fd=36, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.42.6.131', 55836)>; Caller's stack: Traceback (most recent call last): File "/usr/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 1097, in _on_socket_writable self._produce() File "/usr/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 820, in _produce self._tx_buffers[0]) File "/usr/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 79, in retry_sigint_wrap return func(*args, **kwargs) File "/usr/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 861, in _sigint_safe_send return sock.send(data) ConnectionResetError: [Errno 104] Connection reset by peer Traceback (most recent call last): File "/usr/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 1097, in _on_socket_writable self._produce() File "/usr/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 820, in _produce self._tx_buffers[0]) File "/usr/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 79, in retry_sigint_wrap return func(*args, **kwargs) File "/usr/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 861, in _sigint_safe_send return sock.send(data) ConnectionResetError: [Errno 104] Connection reset by peer [2019-07-05 21:40:49,488] [ERROR] pika.adapters.base_connection: connection_lost: StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",) [2019-07-05 21:40:49,555] [ERROR] pika.adapters.blocking_connection: Unexpected connection close detected: StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",)

and here is the RabbitMQ log:

2019-07-05 16:24:40.426 [error] <0.31273.0> closing AMQP connection <0.31273.0> (10.42.6.131:36524 -> 10.42.6.93:5672): missed heartbeats from client, timeout: 60s

I tried to do something similar to one in @Bogdanp post. Am I missing something?

th0th avatar Jul 05 '19 21:07 th0th

The issue here is that the broker doesn't immediately close connections after messages get enqueued, the reasoning being that you're usually going to enqueue more than one message off of a connection. That, combined with the fact that BlockingScheduler blocks the whole thread means heartbeats are never sent to RMQ so the connection eventually gets closed.

The fix here would be to change Dramatiq so that all enqueues are serialized off of a single thread that can wake up everyone once in a while and send heartbeats, but until I can get around to doing that, you can just append

get_broker().connection.close()

to the function you're scheduling. That should get rid of these errors.

Bogdanp avatar Jul 06 '19 07:07 Bogdanp

Thanks for the quick response @Bogdanp! 💐

I have multiple actors scheduled and I'd rather wait for the update than add broker connection closing part to all scheduled actors :)

th0th avatar Jul 08 '19 12:07 th0th

@th0th in the mean time, you could also increase your heartbeat timeouts.

Bogdanp avatar Jul 08 '19 14:07 Bogdanp

@Bogdanp not to pressure or anything but, do you have a time in mind for the update? I am just asking to schedule my own release :)

th0th avatar Jul 11 '19 12:07 th0th

I don't intend to make that change I mentinoed for a long time, sorry.

Bogdanp avatar Jul 12 '19 07:07 Bogdanp

Oh, when you said I thought it was a trivial and planned change, sorry. I would better implement the workaround you suggested then. Thank you @Bogdanp.

th0th avatar Jul 12 '19 13:07 th0th

@Bogdanp I took your advice and added rabbitmq.get_broker().connection.close() to the end of each scheduled task and now I constantly get warnings like this:

[2019-07-14 08:06:47,943] [WARNING] dramatiq.worker.WorkerThread: Failed to process message task() with unhandled exception.
Traceback (most recent call last):
File "/usr/lib/python3.7/site-packages/dramatiq/worker.py", line 470, in process_message
res = actor(*message.args, **message.kwargs)
File "/usr/lib/python3.7/site-packages/dramatiq/actor.py", line 145, in __call__
return self.fn(*args, **kwargs)
File "./project/apps/app/tasks.py", line 173, in task
dramatiq.get_broker().connection.close()
File "/usr/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 787, in close
raise exceptions.ConnectionWrongStateError(msg)
pika.exceptions.ConnectionWrongStateError: BlockingConnection.close(200, 'Normal shutdown') called on closed connection.

Am I doing something wrong?

th0th avatar Jul 14 '19 15:07 th0th

I think I came up with a proper workaround and I wanted to share in case someone else stumbles upon the same issue:

import importlib
import signal
import sys

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from django.core.management.base import BaseCommand


def send(job_path):
    module_path, func_name = job_path.split(':')

    task_module = importlib.import_module(module_path)
    task = getattr(task_module, func_name)

    task.send()

    dramatiq.get_broker().connection.close()


class Command(BaseCommand):
    help = 'Run task scheduler'

    def handle(self, *args, **options):
        scheduler = BlockingScheduler()

        scheduler.add.job(
            trigger=IntervalTrigger(
                seconds=30,
            ),
            name='task1',
            func=send,
            kwargs={
                'job_path': 'project.apps.app.tasks.task1'
            },
        )

        def shutdown(*args):
            self.stdout.write("Exiting...")
            sys.exit(0)

        signal.signal(signal.SIGINT, shutdown)
        signal.signal(signal.SIGTERM, shutdown)

        self.stdout.write("Discovered tasks:")

        for s in scheduler.get_jobs():
            self.stdout.write(f"* {s.name} - {s.trigger}")

        self.stdout.write("\nStarting scheduler...")

        scheduler.start()

        return 0

th0th avatar Jul 15 '19 12:07 th0th

Sorry for necroposting, but I want to express my interest in actual fixes for this (not just a workaround). I assume it is not worked on at the moment?

xunto avatar Dec 14 '22 15:12 xunto