fastapi icon indicating copy to clipboard operation
fastapi copied to clipboard

Stop execution of fastapi endpoint after a specified time to unblock resources

Open aimlnerd opened this issue 3 years ago • 9 comments

First Check

  • [X] I added a very descriptive title to this issue.
  • [X] I used the GitHub search to find a similar issue and didn't find it.
  • [X] I searched the FastAPI documentation, with the integrated search.
  • [X] I already searched in Google "How to X in FastAPI" and didn't find any information.
  • [X] I already read and followed all the tutorial in the docs and didn't find an answer.
  • [X] I already checked if it is not related to FastAPI but to Pydantic.
  • [X] I already checked if it is not related to FastAPI but to Swagger UI.
  • [X] I already checked if it is not related to FastAPI but to ReDoc.

Commit to Help

  • [X] I commit to help with one of those options 👆

Example Code

import time
from uvicorn import Server, Config
from random import randrange
from fastapi import FastAPI

app = FastAPI()

def some_func(text):
    """
    Some computationally heavy function
    whose execution time depends on input text size
    """
    randinteger = randrange(1,120)
    time.sleep(randinteger)# simulate processing of text
    return text


@app.get("/do_something")
async def do_something():
    response = some_func(text="hello world")
    return {"response": response}

# Running
if __name__ == '__main__':
    server = Server(Config(app=app, host='0.0.0.0', port=3001))
    server.run()

Description

The client which calls /do_something have a timeout of 60 seconds in the request/post() call. So if /do_something takes 10 mins, /do_something is blocking the next request to /do_something and is wasteful of resources.

Wanted Solution

  1. Here stopexecution parameter should stop the processing of the endpoint /do_something after 60 seconds and immediately wait for next request to process.

  2. If execution of the end point is stopped we should be able to log it with custom message.

  3. This should not kill the service and work with multithreading/multiprocessing.

Wanted Code

import time
from random import randrange
from fastapi import FastAPI

app = FastAPI()

def some_func(text):
    """
    Some computationally heavy function 
    whose execution time depends on input text size
    """
    randinteger = randrange(1,120)
    time.sleep(randinteger)# simulate processing of text
    return text


@app.get("/do_something", stopexecution=60)
async def do_something():
    response = some_func(text="hello world")
    return {"response": response}

Alternatives

I tried this. But when timeout happens the server is getting killed. Any solution to fix this?

import logging
import time
import timeout_decorator
from uvicorn import Server, Config
from random import randrange
from fastapi import FastAPI

app = FastAPI()


@timeout_decorator.timeout(seconds=2, timeout_exception=StopIteration, use_signals=False)
def some_func(text):
    """
    Some computationally heavy function
    whose execution time depends on input text size
    """
    randinteger = randrange(1,30)
    time.sleep(randinteger)# simulate processing of text
    return text


@app.get("/do_something")
async def do_something():
    try:
        response = some_func(text="hello world")
    except StopIteration:
        logging.warning(f'Stopped /do_something > endpoint due to timeout!')
    else:
        logging.info(f'(  Completed < /do_something > endpoint')

    return {"response": response}


# Running 
if __name__ == '__main__':
    server = Server(Config(app=app, host='0.0.0.0', port=3001))
    server.run()

Operating System

Linux, Windows, macOS

Operating System Details

ubuntu 18.04

FastAPI Version

fastapi==0.73.0

Python Version

3.9

Additional Context

No response

aimlnerd avatar Sep 14 '22 10:09 aimlnerd

@deepakiim could you post your full code in order to run upper do_something

csrgxtu avatar Sep 14 '22 11:09 csrgxtu

@deepakiim could you post your full code in order to run upper do_something

Added full code which closely simulate real scenario.

aimlnerd avatar Sep 14 '22 11:09 aimlnerd

This should not kill the service and work with multithreading/multiprocessing.

Are you sure you want it to work with multiprocessing? Because, if your function was executed by multiprocessing, the solution would be a lot easier. But usually you want to prevent requests spawning of new processes uncontrolled.

JarroVGIT avatar Sep 14 '22 15:09 JarroVGIT

Something like this?

ligantx avatar Sep 14 '22 17:09 ligantx

@deepakiim according your description, you actually want following request didn't pended by first request

The client which calls /do_something have a timeout of 60 seconds in the request/post() call. So if /do_something takes 10 mins, /do_something is blocking the next request to /do_something and is wasteful of resources.

fastapi is an ASGI framework, in dev mode, it starts in one process, and the logic should be arranged by the asyncio event loop, if it meets io operations, event loop will switch off the logic to queue, let other logic run.

to make your logic switchable by event loop, your code must write in asyncio format, i changed your code a little, thus wont pending your next request:

import asyncio
import time
from random import randrange

from fastapi import FastAPI
from uvicorn import Config, Server

app = FastAPI()

async def some_func(text):   # need be async
    """
    Some computationally heavy function
    whose execution time depends on input text size
    """
    # randinteger = randrange(1, 16)
    randinteger = 16
    print(f'Debug gonna sync sleep {randinteger} seconds')
    # time.sleep(randinteger)# simulate processing of text
    await asyncio.sleep(randinteger)  # no-pending, will be switched
    return text


@app.get("/do_something")
async def do_something():
    start_ts = int(time.time())
    await some_func(text="hello world")
    return {"response": int(time.time()) - start_ts}

# Running
if __name__ == '__main__':
    server = Server(Config(app=app, host='0.0.0.0', port=3001))
    server.run()

also if you have heavy cpu-bunding calculations, you should consider using celery or multiprocessing, cuz long running and cpu-bunding logic not suitable for apis.

csrgxtu avatar Sep 15 '22 02:09 csrgxtu

This should not kill the service and work with multithreading/multiprocessing.

Are you sure you want it to work with multiprocessing? Because, if your function was executed by multiprocessing, the solution would be a lot easier. But usually you want to prevent requests spawning of new processes uncontrolled.

@deepakiim if each request can spaw a process, then if you have 10240 requests, there will be 10240 processes need be forked, that's really unefficient. process and thread are heavy compared to routine.

csrgxtu avatar Sep 15 '22 02:09 csrgxtu

@csrgxtu Thanks for this. But it does not solve my problem. We really want to stop the execution of the process after say 5 mins. because we have limited CPU resources and if the calling client have 2 mins timeout then extra processing for 5 mins is wasteful.

aimlnerd avatar Sep 15 '22 13:09 aimlnerd

stop the execution after x mins, here is an example from stackoverflow:

image

https://stackoverflow.com/questions/29756507/how-can-i-add-a-connection-timeout-with-asyncio

csrgxtu avatar Sep 15 '22 13:09 csrgxtu

I put together a quick (and dirty) example how to handle such a use case, when you farm out heavy computational jobs to another process from ProcessPoolExecutor. You can give a timeout in your request (e.g. localhost:8000/long?timeout=8).

Note that this is quick and dirty. It works, but it has some shortcomings, such as proper error handling from your long running job. But I think it gives a direction on how a solution could look like. The following will run as-is. I've added comments to illustrate intend more clearly.

import asyncio
import multiprocessing
import random
from concurrent.futures import ProcessPoolExecutor
from threading import Event
from time import sleep, time

from fastapi import FastAPI, Request

app = FastAPI()

# To keep a count of requests, and giving each request a unique ID
app.state.no_requests = 0

# Used to run our long running job in
pool = ProcessPoolExecutor()

# A long running job that we potentially want to terminate
def long_running_blocking(request_id: int, event: Event):
    sleeptime = random.randint(5, 15)
    slept_time = 0
    print(
        f"({request_id}) - Starting long running job, will take {sleeptime} seconds.."
    )

    # Note that this is blocking on purpose to illustrate blocking call.
    while not event.is_set() and slept_time < sleeptime:
        sleep(1)
        slept_time += 1

    if event.is_set():
        print(f"({request_id}) - Terminating long running job.")
        return

    print(f"({request_id}) - Finished long running job.")
    return sleeptime


@app.get("/long")
async def long(request: Request, timeout: int):
    app.state.no_requests += 1
    request_id = request.app.state.no_requests
    print(f"({request_id}) - Received request")

    starttime = time()

    loop = asyncio.get_event_loop()

    # Manager is required to be able to share an Event object between processes
    with multiprocessing.Manager() as manager:
        event = manager.Event()
        fut = loop.run_in_executor(pool, long_running_blocking, request_id, event)

        while (time() - starttime) < timeout:
            if fut.done():
                # If the task is done before the timeout, break out of timeout loop.
                break
            # Try again in 1 second
            await asyncio.sleep(1)

        try:
            result = fut.result()
        except asyncio.InvalidStateError:
            # result() will rase InvalidStateError if the task is not finished.
            event.set()
            result = "Timed out!"
        finally:
            # Allow the running task to handle the event.set() event.
            while not fut.done():
                await asyncio.sleep(0.1)
            # Finally, return the result (either sleeptime or "Timed out!")
            return result


@app.get("/")
async def root():
    return {"hello": "world"}


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8000)

You can run multiple requests simultaneously. An example where I fire multiple requests yields the following log:

INFO:     Started server process [83924]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
(1) - Received request
(2) - Received request
(1) - Starting long running job, will take 12 seconds..
(3) - Received request
(2) - Starting long running job, will take 5 seconds..
(4) - Received request
(3) - Starting long running job, will take 14 seconds..
(5) - Received request
(4) - Starting long running job, will take 7 seconds..
(5) - Starting long running job, will take 9 seconds..
(2) - Finished long running job.
INFO:     127.0.0.1:56075 - "GET /long?timeout=8 HTTP/1.1" 200 OK
(1) - Terminating long running job.
(4) - Finished long running job.
INFO:     127.0.0.1:56072 - "GET /long?timeout=8 HTTP/1.1" 200 OK
INFO:     127.0.0.1:56085 - "GET /long?timeout=8 HTTP/1.1" 200 OK
(5) - Terminating long running job.
(3) - Terminating long running job.
INFO:     127.0.0.1:56090 - "GET /long?timeout=8 HTTP/1.1" 200 OK
INFO:     127.0.0.1:56080 - "GET /long?timeout=8 HTTP/1.1" 200 OK

JarroVGIT avatar Sep 16 '22 06:09 JarroVGIT

I don't know if you want to allow the client to set the time-out. Since that can be used for an attack to hog a lot of memory. Using @JarroVGIT example localhost:8000/long?timeout=9999999999999999999999999999 is now a valid request. Sure, if your function does not take that long to execute, then that's fine. But then the functionality is meaningless. You should set that time out in the def long so that the client may not change the timeout.

Or way would you let the client set the timeout?

AxelGard avatar Sep 27 '22 07:09 AxelGard

Yes very obviously is this a very bad idea lol, the point however was to illustrate how to stop a running task after a time out period.

Hence de disclaimer:

Note that this is quick and dirty.

JarroVGIT avatar Sep 27 '22 07:09 JarroVGIT

Sorry @JarroVGIT my bad :sweat_smile:

AxelGard avatar Sep 27 '22 08:09 AxelGard

Thanks for the help here everyone! 👏 🙇

If that solves the original problem, then you can close this issue @deepak-george ✔️

tiangolo avatar Nov 14 '22 06:11 tiangolo

Assuming the original need was handled, this will be automatically closed now. But feel free to add more comments or create new issues or PRs.

github-actions[bot] avatar Nov 25 '22 00:11 github-actions[bot]