dramatiq icon indicating copy to clipboard operation
dramatiq copied to clipboard

Uncaught TimeLimitExceeded exception leads to a worker hanging due to decrease of alive threads

Open glebsam opened this issue 2 years ago • 2 comments

Issues

Checklist

  • [x] Does your title concisely summarize the problem?
  • [x] Did you include a minimal, reproducible example?
  • [x] What OS are you using?
  • [x] What version of Dramatiq are you using?
  • [x] What did you do?
  • [x] What did you expect would happen?
  • [x] What happened?

What OS are you using?

Dramatiq is running via django-dramatiq package in Docker container build from registry.access.redhat.com/ubi8/ubi:latest :

# uname -a
Linux 47b330e66b1a 4.14.248-189.473.amzn2.x86_64 #1 SMP Mon Sep 27 05:52:26 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

# cat /etc/redhat-release
Red Hat Enterprise Linux release 8.4 (Ootpa)

What Python version are you using?

>>> print(sys.version)
3.9.2 (default, Mar  5 2021, 01:49:45)
[GCC 8.4.1 20200928 (Red Hat 8.4.1-1)]

What version of Dramatiq are you using?

>>> import dramatiq; print(dramatiq.__version__)
1.11.0

What did you do?

Running an actor which exceeds the time limit being in a syscall and then additionally raises some exception.

What did you expect would happen?

Actor finishes after the syscall finishes, optionally raises the exception (I know about the GIL issue with TimeLimit middleware) and mandatory raises the TimeLimitExceeded exception, worker keeps working.

What happened?

Actor finishes after the syscall finishes, raises the exception, the exception handled by Dramatiq and in the middle of the handling comes another exception - TimeLimitExceeded, worker thread dies, worker hangs.

Minimal, reproducible example

Looks like there's a race, and it's hard to reproduce the issue, but this example should someday blow. I believe it depends on the moment when the exception from ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, exception) will be processed in the thread.

@actor(time_limit=1000 * 5)
def hanging_test_sleep_with_raise():
    timeout_sec = 10
    logger.info(f"Start hanging task with {timeout_sec=}")
    start = timezone.now()
    try:
        time.sleep(timeout_sec)
        raise Exception("Bang!")
    finally:
        logger.info(f"Hanging task finished. Actual execution time {timezone.now() - start}")

Summary

When the worker handles a BaseException

https://github.com/Bogdanp/dramatiq/blob/fa732f46f7e6efda4b7ce39056f5d591dd531206/dramatiq/worker.py#L499-L514

it should handle BaseException as well in self.broker.emit_after

https://github.com/Bogdanp/dramatiq/blob/fa732f46f7e6efda4b7ce39056f5d591dd531206/dramatiq/broker.py#L95-L100

Otherwise, the handling thread will die due to unhandled exception.

OR 🤔

Interrupt exception should derive from Exception instead of BasicException (which is less correct IMO, but the respected maintainer might have a different point of view).

https://github.com/Bogdanp/dramatiq/blob/fa732f46f7e6efda4b7ce39056f5d591dd531206/dramatiq/middleware/threading.py#L44-L51

A double-check of the described inheritance/handling issue

from dramatiq.middleware import raise_thread_exception
from dramatiq.middleware.time_limit import TimeLimitExceeded
import threading

try:
    raise_thread_exception(thread_id=threading.get_ident(), exception=TimeLimitExceeded)
except Exception:
    print("Caught Exception! 🌝")
except BaseException:
    print("Caught BaseException! 🌚")

The stacktrace I had from product in my staging environment

Non 200 response from crypto api: 504: <html>
<head><title>504 Gateway Time-out</title></head>
<body>
<center><h1>504 Gateway Time-out</h1></center>
</body>
</html>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
  File "/usr/local/lib/python3.8/site-packages/sentry_sdk/integrations/threading.py", line 67, in run
return old_run_func(self, *a, **kw)
  File "/usr/local/lib/python3.8/site-packages/dramatiq/worker.py", line 455, in run
self.process_message(message)
  File "/usr/local/lib/python3.8/site-packages/dramatiq/worker.py", line 509, in process_message
self.broker.emit_after("process_message", message, exception=e)
  File "/usr/local/lib/python3.8/site-packages/dramatiq/broker.py", line 98, in emit_after
getattr(middleware, "after_" + signal)(self, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/cryptology_dramatiq/middleware/task_logger.py", line 53, in after_process_message
logger.info(f"Finish task {log_msg_main}, result {result_str}, exception {exception_str}")
  File "/usr/lib64/python3.8/logging/__init__.py", line 1434, in info
self._log(INFO, msg, args, **kwargs)
  File "/usr/lib64/python3.8/logging/__init__.py", line 1577, in _log
self.handle(record)
  File "/usr/lib64/python3.8/logging/__init__.py", line 1587, in handle
self.callHandlers(record)
  File "/usr/local/lib/python3.8/site-packages/sentry_sdk/integrations/logging.py", line 85, in sentry_patched_callhandlers
return old_callhandlers(self, record)
  File "/usr/lib64/python3.8/logging/__init__.py", line 1649, in callHandlers
hdlr.handle(record)
  File "/usr/lib64/python3.8/logging/__init__.py", line 950, in handle
self.emit(record)
  File "/usr/lib64/python3.8/logging/__init__.py", line 1084, in emit
stream.write(msg + self.terminator)
  File "/usr/local/lib/python3.8/site-packages/dramatiq/compat.py", line 55, in write
self.pipe.send_bytes(s.encode(self.encoding, errors="replace"))
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 204, in send_bytes
self._send_bytes(m[offset:offset + size])
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 415, in _send_bytes
self._send(header + buf)
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 372, in _send
n = write(self._handle, buf)
dramatiq.middleware.time_limit
.
TimeLimitExceeded

glebsam avatar Nov 08 '21 17:11 glebsam

Thanks for the detailed report! I think you're right about the race in this case, but it's unfortunately rather hard to fix. I don't think the ideas you presented would be sufficient because once execution escapes the Worker's "try" block, the async exception could fire at any time and it's not feasible to protect everything from that point onward. Probably the best solution here is for users to use gevent when using TimeLimits, because there the timeouts can be more precise and don't rely on CPython runtime voodoo.

Bogdanp avatar Nov 09 '21 08:11 Bogdanp

🤔 I see, you're right, exception escaping is hard to prevent here. Probably, some kind of watchdog for dead threads might be more suitable solution.

glebsam avatar Nov 09 '21 09:11 glebsam