django_dramatiq icon indicating copy to clipboard operation
django_dramatiq copied to clipboard

Test Hangs Waiting for Join

Open idahogray opened this issue 5 years ago • 14 comments

I have some tests in my django projects that are hanging at both the self.broker.join('default') and self.worker.join() lines. I know that it hangs at the self.worker.join() line because I added a try/except block around the self.broker.join('default', timeout=5000) with the timeout added. These tests rely on my django views to send() to the dramatiq tasks in tasks.py.

I added print statements at the beginning and end of all of my actor tasks and I see them print when the timeout is reached.

I have some other tests that send() to the tasks directly and those tests are working perfectly.

I am using the StubBroker in my settings.

If I run the rabbitmq broker with a worker and manage.py runserver, I can see all of the tasks are executing successfully.

Do you have any suggestions on how to troubleshoot this?

idahogray avatar Mar 24 '19 04:03 idahogray

Any chance you could make a minimal reproducible example? Hard to say w/o any code to look at.

Bogdanp avatar Mar 26 '19 17:03 Bogdanp

Thanks for the response and I understand this is difficult without code. I tried to create a simple project to show the problem but I can't recreate it, yet. Is there additional logging I can enable somehow in the worker and/or broker to try to see what is happening?

idahogray avatar Mar 28 '19 16:03 idahogray

The easiest way would be to turn on debug logging on the root logger. Before all your tests, you can add:

import logging
logging.basicConfig(level=logging.DEBUG)

With that you should start seeing debug logging from the broker and worker.

Bogdanp avatar Mar 28 '19 17:03 Bogdanp

I am not sure if this gives you any clues but here is the log from one of the failing tests. It looks to me like the tasks don't start running until after the timeout has occurred.

Creating test database for alias 'default'...
System check identified no issues (0 silenced).
DEBUG 2019-03-28 13:14:25,621 worker 4832 10700 Adding consumer for queue 'default.DQ'.
DEBUG 2019-03-28 13:14:25,622 worker 4832 7560 Running consumer thread...
DEBUG 2019-03-28 13:14:25,622 worker 4832 10700 Adding consumer for queue 'default'.
DEBUG 2019-03-28 13:14:25,622 worker 4832 8168 Running consumer thread...
DEBUG 2019-03-28 13:14:25,623 worker 4832 10700 Adding consumer for delay queue 'default.DQ'.
DEBUG 2019-03-28 13:14:25,623 worker 4832 10700 A consumer for queue 'default.DQ' is already running.
DEBUG 2019-03-28 13:14:25,624 worker 4832 120 Running worker thread...
INFO 2019-03-28 13:14:25,656 middleware 4832 10700 Request start time: 1553800465.65656
INFO 2019-03-28 13:14:25,656 middleware 4832 10700 Request start time: 1553800465.65656
INFO 2019-03-28 13:14:25,698 views 4832 10700 Update: forms_valid: 
DEBUG 2019-03-28 13:14:25,728 middleware 4832 10700 Creating Task from message '0b769d0c-ac5b-4711-9b32-9ca91294d27d'.
DEBUG 2019-03-28 13:14:25,729 worker 4832 8168 Pushing message '0b769d0c-ac5b-4711-9b32-9ca91294d27d' onto work queue.
DEBUG 2019-03-28 13:14:25,730 worker 4832 120 Received message send_gprojects_email(<Approval: 100099-Switchgear Upgrade (Approved by Eric)>, 'http://testserver/approvals/1/', 'http://testserver/approvals/update/1/') with id '0b769d0c-ac5b-4711-9b32-9ca91294d27d'.
DEBUG 2019-03-28 13:14:25,731 middleware 4832 120 Updating Task from message '0b769d0c-ac5b-4711-9b32-9ca91294d27d'.
INFO 2019-03-28 13:14:25,738 tasks 4832 10700 Creating URL for {'signee': 'CN=Eric', 'approval-id': 1, 'email-sent-timestamp': '2019-03-28 19:14:25.738568+00:00', 'email-address': ''}
INFO 2019-03-28 13:14:25,739 tasks 4832 10700 Signature URL: eyJzaWduZWUiOiAiQ049RXJpYyBNaXRjaGVsbCxPVT1LY2ssREM9cG93ZXJlbmcsREM9Y29tIiwgImFwcHJvdmFsLWlkIjogMSwgImVtYWlsLXNlbnQtdGltZXN0YW1wIjogIjIwMTktMDMtMjggMTk6MTQ6MjUuNzM4NTY4KzAwOjAwIiwgImVtYWlsLWFkZHJlc3MiOiAiZXJpYy5taXRjaGVsbEBwb3dlcmVuZy5jb20ifQ==
INFO 2019-03-28 13:14:25,740 tasks 4832 10700 Creating URL for {'signee': 'CN=Nolan', 'approval-id': 1, 'email-sent-timestamp': '2019-03-28 19:14:25.740568+00:00', 'email-address': ''}
INFO 2019-03-28 13:14:25,740 tasks 4832 10700 Signature URL: eyJzaWduZWUiOiAiQ049Tm9sYW4gU255ZGVyLE9VPUJpbCxEQz1wb3dlcmVuZyxEQz1jb20iLCAiYXBwcm92YWwtaWQiOiAxLCAiZW1haWwtc2VudC10aW1lc3RhbXAiOiAiMjAxOS0wMy0yOCAxOToxNDoyNS43NDA1NjgrMDA6MDAiLCAiZW1haWwtYWRkcmVzcyI6ICJub2xhbi5zbnlkZXJAcG93ZXJlbmcuY29tIn0=
DEBUG 2019-03-28 13:14:25,741 middleware 4832 10700 Creating Task from message 'f740d3c5-683f-4371-abc1-6003ccda76e5'.
DEBUG 2019-03-28 13:14:25,744 middleware 4832 10700 Creating Task from message '2d87a228-2645-4630-8b89-877685436c7f'.
DEBUG 2019-03-28 13:14:25,763 worker 4832 8168 Pushing message 'f740d3c5-683f-4371-abc1-6003ccda76e5' onto work queue.
DEBUG 2019-03-28 13:14:25,769 middleware 4832 10700 Creating Task from message '8eaadd2c-5293-4932-a61f-7f251963bef5'.
DEBUG 2019-03-28 13:14:25,770 worker 4832 8168 Pushing message '2d87a228-2645-4630-8b89-877685436c7f' onto work queue.
DEBUG 2019-03-28 13:14:25,771 worker 4832 8168 Pushing message '8eaadd2c-5293-4932-a61f-7f251963bef5' onto work queue.
DEBUG 2019-03-28 13:14:25,781 middleware 4832 10700 Creating Task from message 'bdfa778e-f30b-4a88-b065-01c79dd6f766'.
DEBUG 2019-03-28 13:14:25,781 worker 4832 8168 Pushing message 'bdfa778e-f30b-4a88-b065-01c79dd6f766' onto work queue.
INFO 2019-03-28 13:14:25,784 middleware 4832 10700 Request end time: 1553800465.78457
INFO 2019-03-28 13:14:25,784 middleware 4832 10700 Request end time: 1553800465.78457
INFO 2019-03-28 13:14:25,784 middleware 4832 10700 Request duration: 128.01075
INFO 2019-03-28 13:14:25,784 middleware 4832 10700 Request duration: 128.01075
<dramatiq.worker.Worker object at 0x000000734A005160>
EINFO 2019-03-28 13:14:35,791 worker 4832 10700 Shutting down...
DEBUG 2019-03-28 13:14:35,793 worker 4832 10700 Stopping workers...
DEBUG 2019-03-28 13:14:35,793 worker 4832 10700 Stopping worker thread...
DEBUG 2019-03-28 13:14:35,798 actor 4832 120 Received args=(<Approval: 100099-Switchgear Upgrade (Approved by Eric)>, 'http://testserver/approvals/1/', 'http://testserver/approvals/update/1/') kwargs={}.
Sending gprojects Emails
INFO 2019-03-28 13:14:35,799 tasks 4832 120 Attempting to send email to gprojects:
gprojects Emails Sent
DEBUG 2019-03-28 13:14:35,829 actor 4832 120 Completed after 29.80ms.
DEBUG 2019-03-28 13:14:35,829 middleware 4832 120 Updating Task from message '0b769d0c-ac5b-4711-9b32-9ca91294d27d'.
DEBUG 2019-03-28 13:14:35,881 worker 4832 120 Acknowledging message '0b769d0c-ac5b-4711-9b32-9ca91294d27d'.
DEBUG 2019-03-28 13:14:35,882 worker 4832 120 Worker thread stopped.
DEBUG 2019-03-28 13:14:35,882 worker 4832 10700 Workers stopped.
DEBUG 2019-03-28 13:14:35,884 worker 4832 10700 Stopping consumers...
DEBUG 2019-03-28 13:14:35,884 worker 4832 10700 Stopping consumer thread...
DEBUG 2019-03-28 13:14:35,885 worker 4832 10700 Stopping consumer thread...
DEBUG 2019-03-28 13:14:35,903 worker 4832 7560 Consumer thread stopped.
DEBUG 2019-03-28 13:14:35,947 worker 4832 8168 Consumer thread stopped.
DEBUG 2019-03-28 13:14:35,947 worker 4832 10700 Consumers stopped.
DEBUG 2019-03-28 13:14:35,948 worker 4832 10700 Requeueing in-memory messages...
DEBUG 2019-03-28 13:14:35,948 worker 4832 10700 Done requeueing in-progress messages.
DEBUG 2019-03-28 13:14:35,948 worker 4832 10700 Closing consumers...
DEBUG 2019-03-28 13:14:35,949 worker 4832 10700 Consumers closed.
INFO 2019-03-28 13:14:35,949 worker 4832 10700 Worker has been shut down.
INFO 2019-03-28 13:14:35,949 worker 4832 10700 Shutting down...
DEBUG 2019-03-28 13:14:35,950 worker 4832 10700 Stopping workers...
DEBUG 2019-03-28 13:14:35,950 worker 4832 10700 Stopping worker thread...
DEBUG 2019-03-28 13:14:35,950 worker 4832 10700 Workers stopped.
DEBUG 2019-03-28 13:14:35,951 worker 4832 10700 Stopping consumers...
DEBUG 2019-03-28 13:14:35,951 worker 4832 10700 Stopping consumer thread...
DEBUG 2019-03-28 13:14:35,951 worker 4832 10700 Stopping consumer thread...
DEBUG 2019-03-28 13:14:35,952 worker 4832 10700 Consumers stopped.
DEBUG 2019-03-28 13:14:35,952 worker 4832 10700 Requeueing in-memory messages...
DEBUG 2019-03-28 13:14:35,952 worker 4832 10700 Done requeueing in-progress messages.
DEBUG 2019-03-28 13:14:35,953 worker 4832 10700 Closing consumers...
DEBUG 2019-03-28 13:14:35,953 worker 4832 10700 Consumers closed.
INFO 2019-03-28 13:14:35,953 worker 4832 10700 Worker has been shut down.

======================================================================
ERROR: test_tm_email (results.tests.test_email.EmailTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "C:\Users\kgray\.conda\envs\sas-apps-dramtiq\lib\unittest\mock.py", line 1195, in patched
    return func(*args, **keywargs)
  File "C:\Users\kgray\Documents\sas-applications\sas_applications\results\tests\test_email.py", line 91, in test_tm_email
    self.broker.join(queue, timeout=10000)
  File "C:\Users\kgray\.conda\envs\sas-apps-dramtiq\lib\site-packages\dramatiq\brokers\stub.py", line 141, in join
    join_queue(self.queues[name], timeout=timeout)
  File "C:\Users\kgray\.conda\envs\sas-apps-dramtiq\lib\site-packages\dramatiq\common.py", line 83, in join_queue
    raise QueueJoinTimeout("timed out after %.02f seconds" % timeout)
dramatiq.errors.QueueJoinTimeout: timed out after 10.00 seconds

----------------------------------------------------------------------
Ran 1 test in 10.337s

FAILED (errors=1)
Destroying test database for alias 'default'...

idahogray avatar Mar 28 '19 19:03 idahogray

Are you doing any DB operations inside your tasks? Do you have the AdminMiddleware turned on? If so, maybe try turning it off and see if that helps.

Bogdanp avatar Mar 29 '19 10:03 Bogdanp

Thank you, AdminMiddleware was turned on. I turned it off and the timeouts are no longer occurring. Am I doing something wrong to cause this? Should AdminMiddleware not be used during tests?

idahogray avatar Mar 29 '19 13:03 idahogray

Hard to say w/o knowing the specifics of what DB you are using. Were these tests using TransactionTestCase?

Bogdanp avatar Mar 29 '19 14:03 Bogdanp

Thanks again. Here is my database setup.

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql_psycopg2',
        'NAME': 'approval',
        'USER': 'username',
        'PASSWORD': 'password',
        'HOST': '127.0.0.1',
    }
}

Here are all of my DRAMATIQ settings.

DRAMATIQ_ENCODER = "dramatiq.encoder.PickleEncoder"
DRAMATIQ_TASKS_DATABASE = 'default'
DRAMATIQ_BROKER = {
    "BROKER": "dramatiq.brokers.stub.StubBroker",
    "OPTIONS": {},
    "MIDDLEWARE": [
        "dramatiq.middleware.AgeLimit",
        "dramatiq.middleware.TimeLimit",
        "dramatiq.middleware.Callbacks",
        "dramatiq.middleware.Pipelines",
        "dramatiq.middleware.Retries",
        # "django_dramatiq.middleware.AdminMiddleware",
        "django_dramatiq.middleware.DbConnectionsMiddleware",
    ]
}

The test case in question does inherit from DramatiqTestCase which inherits from TransactionTestCase.

idahogray avatar Mar 29 '19 14:03 idahogray

Thanks for the info. I'll try to reproduce this as soon as I can.

Bogdanp avatar Mar 29 '19 15:03 Bogdanp

I had similar problem. I use DRF and want to test my api view which sets up a dramatiq task. I use StubBroker and pytest fixtures as said in django_dramatiq docs. Then I run my test it hangs just on the line broker.join('default') I made another test and start using my_task.send() directly to figure it out. At first it hanged too. After reading github issues and docs I did the folowing

  • disabled django_dramatiq.middleware.AdminMiddleware for tests
  • marked my test not as @pytest.mark.django_db but as @pytest.mark.django_db(transaction=True) to use real transactions because worker runs in a separate thread (outside the test's transaction) and without this option it can't get the data from the database. (suggested in #52)
  • add fast_fail options to the broker broker.join('default', fail_fast=True) because any exceptions raised by an actor are raised in the worker, which runs in a separate thread. This settings says to broker - just re-raise exceptions from a worker so I can get access to them from a main thread.
  • set up max_retries number on my task. My view were sending the task with wrong argument. The task raised an exception but dramatiq were trying to redo the task again after some time. Because of that my test were waiting while the task is going to fail max_retries times (20 by default).

It helped and my tests are working as I expected. Maybe it will help the others.

BTW - Thanks a lot for a dramatiq. I just started to use it and it's amazing.

wwarne avatar Oct 02 '19 07:10 wwarne

I simply had a custom retry logic and a task that took long to compute, so it seemed that it hanged, while it was actually just trying to do these 20 long iterations :)

lozhn avatar Nov 05 '21 18:11 lozhn

Had this same issue and it took us an afternoon to troubleshoot, thanks @wwarne in https://github.com/Bogdanp/django_dramatiq/issues/32#issuecomment-537369297 - would you accept a PR to update the docs with some of the content here?

shapiromatron avatar Feb 07 '23 23:02 shapiromatron

Very glad it helped!

I'm not sure I understood the part about accepting a PR correctly. Because I, personally, can't accept PR's here. But I suppose @Bogdanp will be glad if you do it. Based on discussions here on Github he seems very opened and communicative person.

@shapiromatron could you please clarify a little?

wwarne avatar Feb 11 '23 12:02 wwarne

I faced the same issue recently and interestingly, as @Bogdanp mentioned:

Are you doing any DB operations inside your tasks? Do you have the AdminMiddleware turned on? If so, maybe try turning it off and see if that helps.

Removing the AdminMiddleware solved the issue, so that I simply removed it from my testing configuration.

yujinio avatar Sep 28 '23 15:09 yujinio