sanic icon indicating copy to clipboard operation
sanic copied to clipboard

app.add_task() triggers RuntimeWarning: coroutine was never awaited

Open jschewebbn opened this issue 3 years ago • 21 comments

This is likely the same problem as #1491.

Describe the bug When running code that uses app.add_task() I get a runtime warning that the coroutine that was passed to the add_task method was never awaited.

I want to make sure that this task is run once for the whole application rather than once per worker. To do that I am using a multiprocessing Lock and Value to track the state of if the task is running.

Execute the code below and wait for the output "Finished sync". Then press ctrl-c and I see:

INFO:__main__:Finished sync
[2021-05-19 10:39:41 -0500] [976249] [INFO] Starting worker [976249]
INFO:sanic.root:Starting worker [976249]
^C[2021-05-19 10:39:57 -0500] [976247] [INFO] Received signal SIGINT. Shutting down.
INFO:sanic.root:Received signal SIGINT. Shutting down.
[2021-05-19 10:39:57 -0500] [976253] [INFO] Stopping worker [976253]
[2021-05-19 10:39:57 -0500] [976251] [INFO] Stopping worker [976251]
[2021-05-19 10:39:57 -0500] [976256] [INFO] Stopping worker [976256]
INFO:sanic.root:Stopping worker [976253]
[2021-05-19 10:39:57 -0500] [976252] [INFO] Stopping worker [976252]
INFO:sanic.root:Stopping worker [976251]
[2021-05-19 10:39:57 -0500] [976255] [INFO] Stopping worker [976255]
[2021-05-19 10:39:57 -0500] [976250] [INFO] Stopping worker [976250]
INFO:sanic.root:Stopping worker [976256]
[2021-05-19 10:39:57 -0500] [976254] [INFO] Stopping worker [976254]
[2021-05-19 10:39:57 -0500] [976249] [INFO] Stopping worker [976249]
INFO:sanic.root:Stopping worker [976252]
INFO:sanic.root:Stopping worker [976250]
INFO:sanic.root:Stopping worker [976249]
INFO:sanic.root:Stopping worker [976255]
INFO:sanic.root:Stopping worker [976254]
[2021-05-19 10:39:57 -0500] [976247] [INFO] Server Stopped
INFO:sanic.root:Server Stopped
sys:1: RuntimeWarning: coroutine 'DnsSynchronizer.start_sync' was never awaited

Code snippet

#!/usr/bin/env python3

import warnings
with warnings.catch_warnings():
    import re
    import sys
    import argparse
    import os
    import logging
    import logging.config
    import json as pjson
    from pathlib import Path
    from pprint import pprint, pformat
    from sanic import Sanic
    from sanic.response import text
    from sanic.response import json
    from sanic.response import html
    import ipaddress
    import pdb
    from typing import List, Set, Tuple
    import time

    from multiprocessing import Value, Lock
    

SCRIPT_DIR=Path(__file__).parent.absolute()

app = Sanic(__name__)
#app.static('/static', str(SCRIPT_DIR.joinpath('static_content').resolve()))

def get_logger():
    return logging.getLogger(__name__)


class DnsSynchronizer(object):
    SYNC_NEEDED = 0
    SYNC_RUNNING = 1
    SYNC_FINISHED = 2

    def __init__(self, sync_state, lock):
        self.sync_state = sync_state
        self.lock = lock
        # the sync will start in one process right away, so assume it's running
        # this variable is here to avoid a lock once the sync finishes
        self.sync_running = True
        self.execute_sync = False

        
    async def start_sync(self):
        """
        If this process is to run the sync, start it
        """
        self.lock.acquire()
        try:
            if self.sync_state.value == DnsSynchronizer.SYNC_NEEDED:
                get_logger().info("Sync is needed, setting state")
                self.sync_state.value = DnsSynchronizer.SYNC_RUNNING
            else:
                get_logger().info("Not running sync")
                return
        finally:
            self.lock.release()
        
        get_logger().info("Starting sync")
        # don't use asyncio.sleep so that this looks like a non-async task 
        time.sleep(60)

        self.lock.acquire()
        try:
            get_logger().info("Finished sync")
            self.sync_state.value = DnsSynchronizer.SYNC_FINISHED
            self.sync_running = False
        finally:
            self.lock.release()
    
            
@app.route("/", methods=["GET","POST"])
async def update_dns(request):
    get_logger().info("Recieved request: %s", pformat(request.json))


def main_method(args):
    sync_state = Value('b', DnsSynchronizer.SYNC_NEEDED)
    lock = Lock()
    
    get_logger().info("Starting main")
    if not hasattr(app.ctx, 'dns_synchronizer'):
        app.ctx.dns_synchronizer = DnsSynchronizer(sync_state, lock)
    
    get_logger().info("adding sync task")
    app.add_task(app.ctx.dns_synchronizer.start_sync())
    get_logger().info("After adding sync task")

    get_logger().info("Starting app")
    # running with debug causes more verbose output and activates the Automatic Reloader
    app.run(host="localhost", port=args.port, debug=args.debug, access_log=False, workers=args.workers)

    
def main(argv=None):
    if argv is None:
        argv = sys.argv[1:]

    parser = argparse.ArgumentParser()
    parser.add_argument("-l", "--logconfig", dest="logconfig", help="logging configuration (default: logging.json)", default='logging.json')
    parser.add_argument("--debug", dest="debug", help="Enable interactive debugger on error", action='store_true')
    parser.add_argument("--workers", dest="workers", help="Number of worker processes", type=int, default=8)
    parser.add_argument("--port", dest="port", help="Port to run on", type=int, default=4000)

    args = parser.parse_args(argv)

    logging.basicConfig(level=logging.INFO)
    return main_method(args)


if __name__ == "__main__":
    sys.exit(main())

Expected behavior No warnings.

Environment (please complete the following information):

  • OS: Ubuntu Linux 20.04
  • Version: Sanic 21.3.4

jschewebbn avatar May 19 '21 15:05 jschewebbn

    app.add_task(app.ctx.dns_synchronizer.start_sync())

I think you need to remove the ().

Tronic avatar May 19 '21 16:05 Tronic

That would be inconsistent with the example in the Sanic documentation https://sanicframework.org/en/guide/basics/tasks.html

jschewebbn avatar May 19 '21 16:05 jschewebbn

You are correct, it should work either way and makes no difference. Sanic just seems to forward that to asyncio's create_task but it will call it first if needed.

A likely cause for problem is that you are adding the task before running Sanic, when there is no asyncio loop yet. Try doing it in main_process_start or after_server_start.

https://sanicframework.org/en/guide/basics/listeners.html

Tronic avatar May 19 '21 16:05 Tronic

According to the documentation on add_task, https://sanic.readthedocs.io/en/stable/sanic/api_reference.html#sanic.app.Sanic.add_task, this is the correct usage pattern.

I'll look into the listeners option. It looks like main_process_start can simplify my logic some.

jschewebbn avatar May 19 '21 16:05 jschewebbn

Using main_process_start doesn't work as that executes the task before the workers start up. I want the workers to be executing while this task is executing.

When I used before_server_start that behaves like I want and doesn't give the warning. It looks like the documentation for add_task needs some updating to make it clear that this needs to be executed after the loop has started.

jschewebbn avatar May 19 '21 16:05 jschewebbn

Will repond in more detail soon, just wanted to make a couple quick clarifications:

  • called or not is fine in add_task
  • before there is a loop or after is fine in add_task

ahopkins avatar May 19 '21 17:05 ahopkins

Hmm yes, it does work just fine prior to app.run, my mistake, and when used on module level it runs on every worker but not on the main process. You are getting that error precisely because the main process never runs tasks (except when workers=1 so it becomes a worker) but the () still causes a coroutine object to be created. Drop the parentheses and the warning goes away.

Tronic avatar May 19 '21 17:05 Tronic

Yes, that makes absolute sense. When there are multiple workers the main process (which is where you are instantiating the coroutine) does not execute listeners.

ahopkins avatar May 19 '21 17:05 ahopkins

Ok, given this I suggest that the documentation and examples be updated to state that when calling add_task to pass the raw method. That would avoid the issue in all cases.

jschewebbn avatar May 19 '21 17:05 jschewebbn

Addenum: due to fork being used for creating workers, it also works the same within non-module-level startup code as in your sample code, as long as the startup code gets executed at all (it might not when running via ASGI, or via sanic command).

Addenum 2: if using auto_reload, you get two warnings because the reloader process also gets a coroutine object that is never executed.

Tronic avatar May 19 '21 17:05 Tronic

Ok, given this I suggest that the documentation and examples be updated to state that when calling add_task to pass the raw method. That would avoid the issue in all cases.

Agreed. Even better, should use the decorator:

@app.add_task
async def notify_server_started_after_five_seconds():
    await asyncio.sleep(5)
    print('Server successfully started!')

Tronic avatar May 19 '21 17:05 Tronic

Good to see some progress here. Is it a documentation issue then?

For the record, I attained the wanted functionality with a listener on after_server_start and a multi-process Lock.try_lock to ensure only one of the workers executes the task.

gatopeich avatar May 30 '21 17:05 gatopeich

@gatopeich Would you care to share a snippet of your solution? Maybe it's worthwhile to add to the docs.

ahopkins avatar May 31 '21 13:05 ahopkins

I think we should just add a 'note:' to the documentation saying. If you are adding a task using this method before app.run is called, it is recommended to use the callable after applying functools.partial if required to pass as an argument to add_task.

eg.

async def slow_work(...):
   ...

app = Sanic(...)
app.add_task(slow_work) # Note: we are passing the callable and not coroutine object `slow_task(...)` 
app.run(...)

The long explanation (given by @Tronic already), but this is just for more clarity.

If you actually call the callable, while passing to add_task, the coroutine object is created. This happens in the 'main' process, as the workers are created using fork they get the copy of this coroutine object (and also the loop object), but if you don't call the callable, only the callable objects copy gets passed to the worker via fork and original callable is not made into a coroutine object in the main process.

In the workers, this task is waited upon, but in the main thread the task is not waited upon and hence one sees the warning as above. A side effect (discussed already) is that this task gets executed in 'each' worker. If one has to run it only on one worker or a few workers, one has to try multiprocessing synchronization mechanisms to achieve this (eg. BoundedSemaphore)

The same example above is considerably simplified using a BoundedSemaphore as follows -

    async def start_sync(self):
        """
        If this process is to run the sync, start it
        """
        concurrency = 1

        max_runs = BoundedSemaphore(value=concurrency)

        with max_runs:
            if self.sync_state.value == DnsSynchronizer.SYNC_NEEDED:
                get_logger().info("Sync is needed, setting state: %d", os.getpid())
                self.sync_state.value = DnsSynchronizer.SYNC_RUNNING
            else:
                get_logger().info("Not running sync")
                return

            get_logger().info("Starting sync: %d", os.getpid())
            time.sleep(60)
            get_logger().info("Finished sync: %d", os.getpid())

            self.sync_state.value = DnsSynchronizer.SYNC_FINISHED
            self.sync_running = False

gabhijit avatar Jun 26 '21 12:06 gabhijit

@gabhijit Would you care to take a stab at adding to the docs?

https://github.com/sanic-org/sanic-guide/blob/main/src/en/guide/basics/tasks.md

ahopkins avatar Jun 27 '21 07:06 ahopkins

@ahopkins : Sure will add it.

gabhijit avatar Jun 27 '21 08:06 gabhijit

Should probably deprecate passing a coroutine object to avoid this problem. I wonder how much trouble it would cause app developers if we gave deprecation warnings or eventually removed the feature... Or could just say it in documentation.

Tronic avatar Jun 27 '21 22:06 Tronic

Should probably deprecate passing a coroutine object to avoid this problem. I wonder how much trouble it would cause app developers if we gave deprecation warnings or eventually removed the feature... Or could just say it in documentation.

I see no reason to deprecate it, and sometimes prefer this usage, especially when passing in some dynamic value. IMO it is a problem that can be fixed with better documentation.

ahopkins avatar Jun 28 '21 05:06 ahopkins

@gatopeich Would you care to share a snippet of your solution? Maybe it's worthwhile to add to the docs.

From the top of my head:

import multiprocess as mp

@app.listener('after_server_start')
async def run_only_once(lock=mp.Lock()):
    if lock.acquire(False):
        print("Only one worker will get this lock and leave it locked forever...")

gatopeich avatar Sep 28 '21 22:09 gatopeich

I think we should just add a 'note:' to the documentation saying. If you are adding a task using this method before app.run is called, it is recommended to use the callable after applying functools.partial if required to pass as an argument to add_task.

eg.

async def slow_work(...):
   ...

app = Sanic(...)
app.add_task(slow_work) # Note: we are passing the callable and not coroutine object `slow_task(...)` 
app.run(...)

The long explanation (given by @Tronic already), but this is just for more clarity.

If you actually call the callable, while passing to add_task, the coroutine object is created. This happens in the 'main' process, as the workers are created using fork they get the copy of this coroutine object (and also the loop object), but if you don't call the callable, only the callable objects copy gets passed to the worker via fork and original callable is not made into a coroutine object in the main process.

In the workers, this task is waited upon, but in the main thread the task is not waited upon and hence one sees the warning as above. A side effect (discussed already) is that this task gets executed in 'each' worker. If one has to run it only on one worker or a few workers, one has to try multiprocessing synchronization mechanisms to achieve this (eg. BoundedSemaphore)

The same example above is considerably simplified using a BoundedSemaphore as follows -

    async def start_sync(self):
        """
        If this process is to run the sync, start it
        """
        concurrency = 1

        max_runs = BoundedSemaphore(value=concurrency)

        with max_runs:
            if self.sync_state.value == DnsSynchronizer.SYNC_NEEDED:
                get_logger().info("Sync is needed, setting state: %d", os.getpid())
                self.sync_state.value = DnsSynchronizer.SYNC_RUNNING
            else:
                get_logger().info("Not running sync")
                return

            get_logger().info("Starting sync: %d", os.getpid())
            time.sleep(60)
            get_logger().info("Finished sync: %d", os.getpid())

            self.sync_state.value = DnsSynchronizer.SYNC_FINISHED
            self.sync_running = False

Do you guys think adding built-in support in Sanic to let it only runs on one or some workers is worth?

By the way, in the other issue, switching from fork to spawn for multi-processes has been discussed. I think that might change the behavior of this functionality.

ChihweiLHBird avatar Nov 26 '21 03:11 ChihweiLHBird

I think that might change the behavior of this functionality.

How so?

Do you guys think adding built-in support in Sanic to let it only runs on one or some workers is worth?

Sounds interesting, but it also sounds like we would need to support cross-process sync. I am not so keen on this since it would be different and lead to a lot of confusion and issues between scaling with workers v containers (for example). There is absolutely an opening here, but it sounds like it is more in the realm of plugin than core functionality.


With that said, I have thought about opening up the API to the multiprocessing call.

ahopkins avatar Dec 01 '21 17:12 ahopkins

Is this issue considered as solved?

ChihweiLHBird avatar Nov 06 '22 09:11 ChihweiLHBird

@gatopeich Would you care to share a snippet of your solution? Maybe it's worthwhile to add to the docs.

From the top of my head:

import multiprocess as mp

@app.listener('after_server_start')
async def run_only_once(lock=mp.Lock()):
    if lock.acquire(False):
        print("Only one worker will get this lock and leave it locked forever...")

Injection into sanic's shared context looks better

from multiprocessing import Lock

@app.main_process_start
async def main_process_start_init(*_):
    # init lock
    app.shared_ctx.task_lock = Lock()

Gu-f avatar Feb 06 '24 10:02 Gu-f