dramatiq
dramatiq copied to clipboard
Uncaught TimeLimitExceeded exception leads to a worker hanging due to decrease of alive threads
Issues
Checklist
- [x] Does your title concisely summarize the problem?
- [x] Did you include a minimal, reproducible example?
- [x] What OS are you using?
- [x] What version of Dramatiq are you using?
- [x] What did you do?
- [x] What did you expect would happen?
- [x] What happened?
What OS are you using?
Dramatiq is running via django-dramatiq
package in Docker container build from registry.access.redhat.com/ubi8/ubi:latest
:
# uname -a
Linux 47b330e66b1a 4.14.248-189.473.amzn2.x86_64 #1 SMP Mon Sep 27 05:52:26 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux
# cat /etc/redhat-release
Red Hat Enterprise Linux release 8.4 (Ootpa)
What Python version are you using?
>>> print(sys.version)
3.9.2 (default, Mar 5 2021, 01:49:45)
[GCC 8.4.1 20200928 (Red Hat 8.4.1-1)]
What version of Dramatiq are you using?
>>> import dramatiq; print(dramatiq.__version__)
1.11.0
What did you do?
Running an actor which exceeds the time limit being in a syscall and then additionally raises some exception.
What did you expect would happen?
Actor finishes after the syscall finishes, optionally raises the exception (I know about the GIL issue with TimeLimit
middleware) and mandatory raises the TimeLimitExceeded
exception, worker keeps working.
What happened?
Actor finishes after the syscall finishes, raises the exception, the exception handled by Dramatiq and in the middle of the handling comes another exception - TimeLimitExceeded
, worker thread dies, worker hangs.
Minimal, reproducible example
Looks like there's a race, and it's hard to reproduce the issue, but this example should someday blow. I believe it depends on the moment when the exception from ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, exception)
will be processed in the thread.
@actor(time_limit=1000 * 5)
def hanging_test_sleep_with_raise():
timeout_sec = 10
logger.info(f"Start hanging task with {timeout_sec=}")
start = timezone.now()
try:
time.sleep(timeout_sec)
raise Exception("Bang!")
finally:
logger.info(f"Hanging task finished. Actual execution time {timezone.now() - start}")
Summary
When the worker handles a BaseException
https://github.com/Bogdanp/dramatiq/blob/fa732f46f7e6efda4b7ce39056f5d591dd531206/dramatiq/worker.py#L499-L514
it should handle BaseException
as well in self.broker.emit_after
https://github.com/Bogdanp/dramatiq/blob/fa732f46f7e6efda4b7ce39056f5d591dd531206/dramatiq/broker.py#L95-L100
Otherwise, the handling thread will die due to unhandled exception.
OR 🤔
Interrupt
exception should derive from Exception
instead of BasicException
(which is less correct IMO, but the respected maintainer might have a different point of view).
https://github.com/Bogdanp/dramatiq/blob/fa732f46f7e6efda4b7ce39056f5d591dd531206/dramatiq/middleware/threading.py#L44-L51
A double-check of the described inheritance/handling issue
from dramatiq.middleware import raise_thread_exception
from dramatiq.middleware.time_limit import TimeLimitExceeded
import threading
try:
raise_thread_exception(thread_id=threading.get_ident(), exception=TimeLimitExceeded)
except Exception:
print("Caught Exception! 🌝")
except BaseException:
print("Caught BaseException! 🌚")
The stacktrace I had from product in my staging environment
Non 200 response from crypto api: 504: <html>
<head><title>504 Gateway Time-out</title></head>
<body>
<center><h1>504 Gateway Time-out</h1></center>
</body>
</html>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.8/site-packages/sentry_sdk/integrations/threading.py", line 67, in run
return old_run_func(self, *a, **kw)
File "/usr/local/lib/python3.8/site-packages/dramatiq/worker.py", line 455, in run
self.process_message(message)
File "/usr/local/lib/python3.8/site-packages/dramatiq/worker.py", line 509, in process_message
self.broker.emit_after("process_message", message, exception=e)
File "/usr/local/lib/python3.8/site-packages/dramatiq/broker.py", line 98, in emit_after
getattr(middleware, "after_" + signal)(self, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/cryptology_dramatiq/middleware/task_logger.py", line 53, in after_process_message
logger.info(f"Finish task {log_msg_main}, result {result_str}, exception {exception_str}")
File "/usr/lib64/python3.8/logging/__init__.py", line 1434, in info
self._log(INFO, msg, args, **kwargs)
File "/usr/lib64/python3.8/logging/__init__.py", line 1577, in _log
self.handle(record)
File "/usr/lib64/python3.8/logging/__init__.py", line 1587, in handle
self.callHandlers(record)
File "/usr/local/lib/python3.8/site-packages/sentry_sdk/integrations/logging.py", line 85, in sentry_patched_callhandlers
return old_callhandlers(self, record)
File "/usr/lib64/python3.8/logging/__init__.py", line 1649, in callHandlers
hdlr.handle(record)
File "/usr/lib64/python3.8/logging/__init__.py", line 950, in handle
self.emit(record)
File "/usr/lib64/python3.8/logging/__init__.py", line 1084, in emit
stream.write(msg + self.terminator)
File "/usr/local/lib/python3.8/site-packages/dramatiq/compat.py", line 55, in write
self.pipe.send_bytes(s.encode(self.encoding, errors="replace"))
File "/usr/lib64/python3.8/multiprocessing/connection.py", line 204, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/usr/lib64/python3.8/multiprocessing/connection.py", line 415, in _send_bytes
self._send(header + buf)
File "/usr/lib64/python3.8/multiprocessing/connection.py", line 372, in _send
n = write(self._handle, buf)
dramatiq.middleware.time_limit
.
TimeLimitExceeded
Thanks for the detailed report! I think you're right about the race in this case, but it's unfortunately rather hard to fix. I don't think the ideas you presented would be sufficient because once execution escapes the Worker's "try" block, the async exception could fire at any time and it's not feasible to protect everything from that point onward. Probably the best solution here is for users to use gevent
when using TimeLimit
s, because there the timeouts can be more precise and don't rely on CPython runtime voodoo.
🤔 I see, you're right, exception escaping is hard to prevent here. Probably, some kind of watchdog for dead threads might be more suitable solution.