arq
arq copied to clipboard
Synchronous Jobs are not getting cancelled after timeout has been reached
I'm not sure this can be fixed - maybe a warning in the docs is the least there can be done. As far as I understand there is no way to cancel a concurrent.futures.Future
reliably. Not sure if arq just wraps the future returned by loop.run_in_executor with an asyncio.wait_for call? The underlying function, however, will not be stopped.
To reproduce
def _some_other():
print("some other func")
time.sleep(2)
print("some other func DONE")
def _timeout_check():
print("timeout check")
time.sleep(1)
_some_other()
print("timeout check DONE")
async def timeout_check(ctx):
tc_sync = functools.partial(_timeout_check)
loop = asyncio.get_running_loop()
loop.set_debug(True)
return await loop.run_in_executor(ctx["pool"], tc_sync)
async def startup(ctx):
ctx["pool"] = futures.ProcessPoolExecutor()
class WorkerSettings:
job_function = func(timeout_check, name="timeout_check", timeout=3)
functions = [job_function]
redis_settings = RedisSettings(
host=settings.GPS_REDIS_HOST,
port=settings.GPS_REDIS_PORT,
database=settings.GPS_REDIS_DB,
)
on_startup = startup
Will give you this output
2023-05-05 07:23:36.291 | DEBUG | report_worker.main:timeout_check:50 - async wrapper called
timeout check
some other func
07:23:39: 3.00s ! 8a354ca8e6524328bf0f1b16857e1f3b:timeout_check failed, TimeoutError:
Traceback (most recent call last):
File "/app/report_worker/main.py", line 54, in timeout_check
return await loop.run_in_executor(ctx["pool"], tc_sync)
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
return fut.result()
asyncio.exceptions.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/basicuser/.local/lib/python3.10/site-packages/arq/worker.py", line 574, in run_job
result = await asyncio.wait_for(task, timeout_s)
File "/usr/local/lib/python3.10/asyncio/tasks.py", line 458, in wait_for
raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
some other func DONE
timeout check DONE