sanic
sanic copied to clipboard
app.add_task() triggers RuntimeWarning: coroutine was never awaited
This is likely the same problem as #1491.
Describe the bug When running code that uses app.add_task() I get a runtime warning that the coroutine that was passed to the add_task method was never awaited.
I want to make sure that this task is run once for the whole application rather than once per worker. To do that I am using a multiprocessing Lock
and Value
to track the state of if the task is running.
Execute the code below and wait for the output "Finished sync". Then press ctrl-c and I see:
INFO:__main__:Finished sync
[2021-05-19 10:39:41 -0500] [976249] [INFO] Starting worker [976249]
INFO:sanic.root:Starting worker [976249]
^C[2021-05-19 10:39:57 -0500] [976247] [INFO] Received signal SIGINT. Shutting down.
INFO:sanic.root:Received signal SIGINT. Shutting down.
[2021-05-19 10:39:57 -0500] [976253] [INFO] Stopping worker [976253]
[2021-05-19 10:39:57 -0500] [976251] [INFO] Stopping worker [976251]
[2021-05-19 10:39:57 -0500] [976256] [INFO] Stopping worker [976256]
INFO:sanic.root:Stopping worker [976253]
[2021-05-19 10:39:57 -0500] [976252] [INFO] Stopping worker [976252]
INFO:sanic.root:Stopping worker [976251]
[2021-05-19 10:39:57 -0500] [976255] [INFO] Stopping worker [976255]
[2021-05-19 10:39:57 -0500] [976250] [INFO] Stopping worker [976250]
INFO:sanic.root:Stopping worker [976256]
[2021-05-19 10:39:57 -0500] [976254] [INFO] Stopping worker [976254]
[2021-05-19 10:39:57 -0500] [976249] [INFO] Stopping worker [976249]
INFO:sanic.root:Stopping worker [976252]
INFO:sanic.root:Stopping worker [976250]
INFO:sanic.root:Stopping worker [976249]
INFO:sanic.root:Stopping worker [976255]
INFO:sanic.root:Stopping worker [976254]
[2021-05-19 10:39:57 -0500] [976247] [INFO] Server Stopped
INFO:sanic.root:Server Stopped
sys:1: RuntimeWarning: coroutine 'DnsSynchronizer.start_sync' was never awaited
Code snippet
#!/usr/bin/env python3
import warnings
with warnings.catch_warnings():
import re
import sys
import argparse
import os
import logging
import logging.config
import json as pjson
from pathlib import Path
from pprint import pprint, pformat
from sanic import Sanic
from sanic.response import text
from sanic.response import json
from sanic.response import html
import ipaddress
import pdb
from typing import List, Set, Tuple
import time
from multiprocessing import Value, Lock
SCRIPT_DIR=Path(__file__).parent.absolute()
app = Sanic(__name__)
#app.static('/static', str(SCRIPT_DIR.joinpath('static_content').resolve()))
def get_logger():
return logging.getLogger(__name__)
class DnsSynchronizer(object):
SYNC_NEEDED = 0
SYNC_RUNNING = 1
SYNC_FINISHED = 2
def __init__(self, sync_state, lock):
self.sync_state = sync_state
self.lock = lock
# the sync will start in one process right away, so assume it's running
# this variable is here to avoid a lock once the sync finishes
self.sync_running = True
self.execute_sync = False
async def start_sync(self):
"""
If this process is to run the sync, start it
"""
self.lock.acquire()
try:
if self.sync_state.value == DnsSynchronizer.SYNC_NEEDED:
get_logger().info("Sync is needed, setting state")
self.sync_state.value = DnsSynchronizer.SYNC_RUNNING
else:
get_logger().info("Not running sync")
return
finally:
self.lock.release()
get_logger().info("Starting sync")
# don't use asyncio.sleep so that this looks like a non-async task
time.sleep(60)
self.lock.acquire()
try:
get_logger().info("Finished sync")
self.sync_state.value = DnsSynchronizer.SYNC_FINISHED
self.sync_running = False
finally:
self.lock.release()
@app.route("/", methods=["GET","POST"])
async def update_dns(request):
get_logger().info("Recieved request: %s", pformat(request.json))
def main_method(args):
sync_state = Value('b', DnsSynchronizer.SYNC_NEEDED)
lock = Lock()
get_logger().info("Starting main")
if not hasattr(app.ctx, 'dns_synchronizer'):
app.ctx.dns_synchronizer = DnsSynchronizer(sync_state, lock)
get_logger().info("adding sync task")
app.add_task(app.ctx.dns_synchronizer.start_sync())
get_logger().info("After adding sync task")
get_logger().info("Starting app")
# running with debug causes more verbose output and activates the Automatic Reloader
app.run(host="localhost", port=args.port, debug=args.debug, access_log=False, workers=args.workers)
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser()
parser.add_argument("-l", "--logconfig", dest="logconfig", help="logging configuration (default: logging.json)", default='logging.json')
parser.add_argument("--debug", dest="debug", help="Enable interactive debugger on error", action='store_true')
parser.add_argument("--workers", dest="workers", help="Number of worker processes", type=int, default=8)
parser.add_argument("--port", dest="port", help="Port to run on", type=int, default=4000)
args = parser.parse_args(argv)
logging.basicConfig(level=logging.INFO)
return main_method(args)
if __name__ == "__main__":
sys.exit(main())
Expected behavior No warnings.
Environment (please complete the following information):
- OS: Ubuntu Linux 20.04
- Version: Sanic 21.3.4
app.add_task(app.ctx.dns_synchronizer.start_sync())
I think you need to remove the ()
.
That would be inconsistent with the example in the Sanic documentation https://sanicframework.org/en/guide/basics/tasks.html
You are correct, it should work either way and makes no difference. Sanic just seems to forward that to asyncio's create_task
but it will call it first if needed.
A likely cause for problem is that you are adding the task before running Sanic, when there is no asyncio loop yet. Try doing it in main_process_start
or after_server_start
.
https://sanicframework.org/en/guide/basics/listeners.html
According to the documentation on add_task
, https://sanic.readthedocs.io/en/stable/sanic/api_reference.html#sanic.app.Sanic.add_task, this is the correct usage pattern.
I'll look into the listeners option. It looks like main_process_start
can simplify my logic some.
Using main_process_start
doesn't work as that executes the task before the workers start up. I want the workers to be executing while this task is executing.
When I used before_server_start
that behaves like I want and doesn't give the warning. It looks like the documentation for add_task
needs some updating to make it clear that this needs to be executed after the loop has started.
Will repond in more detail soon, just wanted to make a couple quick clarifications:
- called or not is fine in
add_task
- before there is a loop or after is fine in
add_task
Hmm yes, it does work just fine prior to app.run
, my mistake, and when used on module level it runs on every worker but not on the main process. You are getting that error precisely because the main process never runs tasks (except when workers=1 so it becomes a worker) but the ()
still causes a coroutine object to be created. Drop the parentheses and the warning goes away.
Yes, that makes absolute sense. When there are multiple workers the main process (which is where you are instantiating the coroutine) does not execute listeners.
Ok, given this I suggest that the documentation and examples be updated to state that when calling add_task
to pass the raw method. That would avoid the issue in all cases.
Addenum: due to fork
being used for creating workers, it also works the same within non-module-level startup code as in your sample code, as long as the startup code gets executed at all (it might not when running via ASGI, or via sanic
command).
Addenum 2: if using auto_reload
, you get two warnings because the reloader process also gets a coroutine object that is never executed.
Ok, given this I suggest that the documentation and examples be updated to state that when calling
add_task
to pass the raw method. That would avoid the issue in all cases.
Agreed. Even better, should use the decorator:
@app.add_task
async def notify_server_started_after_five_seconds():
await asyncio.sleep(5)
print('Server successfully started!')
Good to see some progress here. Is it a documentation issue then?
For the record, I attained the wanted functionality with a listener on after_server_start
and a multi-process Lock.try_lock
to ensure only one of the workers executes the task.
@gatopeich Would you care to share a snippet of your solution? Maybe it's worthwhile to add to the docs.
I think we should just add a 'note:' to the documentation saying. If you are adding a task using this method before app.run
is called, it is recommended to use the callable after applying functools.partial
if required to pass as an argument to add_task
.
eg.
async def slow_work(...):
...
app = Sanic(...)
app.add_task(slow_work) # Note: we are passing the callable and not coroutine object `slow_task(...)`
app.run(...)
The long explanation (given by @Tronic already), but this is just for more clarity.
If you actually call
the callable, while passing to add_task
, the coroutine object is created. This happens in the 'main' process, as the workers are created using fork
they get the copy of this coroutine object (and also the loop
object), but if you don't call
the callable, only the callable objects copy gets passed to the worker via fork
and original callable
is not made into a coroutine object in the main
process.
In the workers, this task is waited upon, but in the main thread the task is not waited upon and hence one sees the warning as above. A side effect (discussed already) is that this task gets executed in 'each' worker. If one has to run it only on one worker or a few workers, one has to try multiprocessing synchronization mechanisms to achieve this (eg. BoundedSemaphore
)
The same example above is considerably simplified using a BoundedSemaphore
as follows -
async def start_sync(self):
"""
If this process is to run the sync, start it
"""
concurrency = 1
max_runs = BoundedSemaphore(value=concurrency)
with max_runs:
if self.sync_state.value == DnsSynchronizer.SYNC_NEEDED:
get_logger().info("Sync is needed, setting state: %d", os.getpid())
self.sync_state.value = DnsSynchronizer.SYNC_RUNNING
else:
get_logger().info("Not running sync")
return
get_logger().info("Starting sync: %d", os.getpid())
time.sleep(60)
get_logger().info("Finished sync: %d", os.getpid())
self.sync_state.value = DnsSynchronizer.SYNC_FINISHED
self.sync_running = False
@gabhijit Would you care to take a stab at adding to the docs?
https://github.com/sanic-org/sanic-guide/blob/main/src/en/guide/basics/tasks.md
@ahopkins : Sure will add it.
Should probably deprecate passing a coroutine object to avoid this problem. I wonder how much trouble it would cause app developers if we gave deprecation warnings or eventually removed the feature... Or could just say it in documentation.
Should probably deprecate passing a coroutine object to avoid this problem. I wonder how much trouble it would cause app developers if we gave deprecation warnings or eventually removed the feature... Or could just say it in documentation.
I see no reason to deprecate it, and sometimes prefer this usage, especially when passing in some dynamic value. IMO it is a problem that can be fixed with better documentation.
@gatopeich Would you care to share a snippet of your solution? Maybe it's worthwhile to add to the docs.
From the top of my head:
import multiprocess as mp
@app.listener('after_server_start')
async def run_only_once(lock=mp.Lock()):
if lock.acquire(False):
print("Only one worker will get this lock and leave it locked forever...")
I think we should just add a 'note:' to the documentation saying. If you are adding a task using this method before
app.run
is called, it is recommended to use the callable after applyingfunctools.partial
if required to pass as an argument toadd_task
.eg.
async def slow_work(...): ... app = Sanic(...) app.add_task(slow_work) # Note: we are passing the callable and not coroutine object `slow_task(...)` app.run(...)
The long explanation (given by @Tronic already), but this is just for more clarity.
If you actually
call
the callable, while passing toadd_task
, the coroutine object is created. This happens in the 'main' process, as the workers are created usingfork
they get the copy of this coroutine object (and also theloop
object), but if you don'tcall
the callable, only the callable objects copy gets passed to the worker viafork
and originalcallable
is not made into a coroutine object in themain
process.In the workers, this task is waited upon, but in the main thread the task is not waited upon and hence one sees the warning as above. A side effect (discussed already) is that this task gets executed in 'each' worker. If one has to run it only on one worker or a few workers, one has to try multiprocessing synchronization mechanisms to achieve this (eg.
BoundedSemaphore
)The same example above is considerably simplified using a
BoundedSemaphore
as follows -async def start_sync(self): """ If this process is to run the sync, start it """ concurrency = 1 max_runs = BoundedSemaphore(value=concurrency) with max_runs: if self.sync_state.value == DnsSynchronizer.SYNC_NEEDED: get_logger().info("Sync is needed, setting state: %d", os.getpid()) self.sync_state.value = DnsSynchronizer.SYNC_RUNNING else: get_logger().info("Not running sync") return get_logger().info("Starting sync: %d", os.getpid()) time.sleep(60) get_logger().info("Finished sync: %d", os.getpid()) self.sync_state.value = DnsSynchronizer.SYNC_FINISHED self.sync_running = False
Do you guys think adding built-in support in Sanic to let it only runs on one or some workers is worth?
By the way, in the other issue, switching from fork to spawn for multi-processes has been discussed. I think that might change the behavior of this functionality.
I think that might change the behavior of this functionality.
How so?
Do you guys think adding built-in support in Sanic to let it only runs on one or some workers is worth?
Sounds interesting, but it also sounds like we would need to support cross-process sync. I am not so keen on this since it would be different and lead to a lot of confusion and issues between scaling with workers v containers (for example). There is absolutely an opening here, but it sounds like it is more in the realm of plugin than core functionality.
With that said, I have thought about opening up the API to the multiprocessing call.
Is this issue considered as solved?
@gatopeich Would you care to share a snippet of your solution? Maybe it's worthwhile to add to the docs.
From the top of my head:
import multiprocess as mp @app.listener('after_server_start') async def run_only_once(lock=mp.Lock()): if lock.acquire(False): print("Only one worker will get this lock and leave it locked forever...")
Injection into sanic's shared context looks better
from multiprocessing import Lock
@app.main_process_start
async def main_process_start_init(*_):
# init lock
app.shared_ctx.task_lock = Lock()