starlette
starlette copied to clipboard
Custom `CapacityLimiter`
Starlette is a web framework that supports both async and sync functions. The sync part code runs in a threadpool.
The threadpool contains a maximum number of threads: 40.
https://github.com/agronholm/anyio/blob/4f3a8056a8b14dbe43c95039a0d731ede1083cb7/src/anyio/_backends/_asyncio.py#L2071-L2077
The concern on this issue is that the threads are shared between the ones that handle the endpoint, and the background tasks.
Assume we have a simple application:
from time import sleep
from starlette.applications import Starlette
from starlette.background import BackgroundTasks
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
num = 0
def count_sleep():
global num
num += 1
print(f"Running number {num}.")
sleep(10)
def endpoint(request: Request) -> JSONResponse:
tasks = BackgroundTasks()
tasks.add_task(count_sleep)
return JSONResponse({"message": "Hello, world!"}, background=tasks)
app = Starlette(routes=[Route("/", endpoint)])
Running it with uvicorn:
uvicorn main:app
And performing some requests (using httpie):
for run in {1..100}; do
http :8000 &
done
We can observe that:
- We can see
Running number 40.. - Wait 10 seconds...
- We can see
Running number 80.. - Wait 10 seconds...
- We can see
Running number 100..
I'm just bringing this up, so people are aware.
@agronholm proposed on Gitter that we create a separated CapacityLimiter dedicated only for handling the application (i.e. request_response()). This means that n (depending on the number of tokens we choose) number of threads would be dedicated for request_response().
[!IMPORTANT]
- We're using Polar.sh so you can upvote and help fund this issue.
- We receive the funding once the issue is completed & confirmed by you.
- Thank you in advance for helping prioritize & fund our backlog.
I think that having a separate capacity limiter makes a lot of sense - if only so that starlette can handle without having its capacity being swallowed by tasks from other applications.
Should we also have a thought about whether the usage of iterate_in_threadpool by StreamingResponse should be covered by the same or another capacity limiter? And the background tasks too?
Otherwise, it would be possible to reproduce the same issue if synchronous iterators compete with background tasks (I also note that file IO from FileResponse all happens on the threadpool with the default limiter, too.)
UploadFile also uses threads: https://github.com/encode/starlette/blob/a3b43f0cf0d946267c0143b9e2b435a24cc77892/starlette/datastructures.py#L460
I think it may be hard to partition out all uses of thread pools into individual CapacityLimiter's.
Aside form making individual CapacityLimiters for each "use case", it would also be nice to make these tunable by users. Maybe we can do both things: Route accepts a CapacityLimiter, Request.form() or UploadFile.write accepts a CapacityLimiter, etc. and users can decide if they should all be the same instance of different ones. Then we also don't have to decide which use cases get grouped together, we put that in the user's hands.
So users can choose:
def sync_route(req):
return Response()
# individual limiters
routes = [
Route("/foo", sync_route, limiter=CapacityLimiter(1000),
Route("/bar", sync_route, limiter=CapacityLimiter(5),
]
# shared limiter
limiter = CapacityLimiter(100)
async def async_route(req):
files = await req.form(limiter=CapacityLimiter(10)) # used for writing to the file
await files["file"].read(limiter=limiter) # override the above limiter for reading
return Response()
routes = [
Route("/foo", async_route
Route("/bar", endpoint, limiter= limiter,
]
I don't think the user should have control over this.
I think Starlette should be able to figure it out internally the optimal way.
I think it will be hard to find an "optimal way", this seems super workload / business logic dependent.
Pushing such a depth of choice to the users of starlette is not a real solution either - it would make the API a lot more fiddly, for what I suspect are a minority of users;
Here is my attempt at categorizing all the uses of anyio.to_thread.run_sync and starlette.concurrency.{run,iterate}_in_threadpool:
- WSGI Middleware - to be deprecated;
FileResponseandStaticFilesmiddleware - to me, these two usages are very linked. We should also note thatanyio'sAsyncFilecurrently only supports the default limiter for file operations.UploadFile- automatically used to handle file I/O when receiving a multipart upload;- synchronous
BackgroundTasks; - synchronous
StreamingResponseiterators; - synchronous
Callable[[Request],Response]endpoints (last but not least);
Out of these, I would note that only the cases in bold run user-defined code.
I think that, as a first step, it does not seem necessary to define a custom capacity limiter for the features that do not run user-defined code. Why? The starlette built-in usages of synchronous calls are always made as a "dip", which should multiplex as well as humanly possible with any other tasks that usually use anyio.to_thread.run_sync.
As a second step, I would suggest choosing a single number (e.g. 40), and defining a separate capacity limiter for each of the lines in bold. Why?
- If
BackgroundTasksshare a capacity limiter with anyone, they risk throttling the servicing of requests, which is the last thing we want to happen; - If synchronous
StreamingResponseiterators share a capacity limiter with synchronousCallable[[Request],Response]endpoints, I have a real fear that the endpoints (which will take more individual time per call than thenextcalls to the iterators) would steal available threads from the synchronous iterators; - Why the same number? It makes it simpler to configure, and it can be explained by saying "at each of these three phases of request processing, you can have a maximum of XX concurrent synchronous requests"
Pushing such a depth of choice to the users of starlette is not a real solution either - it would make the API a lot more fiddly, for what I suspect are a minority of users
The fiddly API would only apply to the small minority of users that want to use the feature, the vast majority of users would only see an extra keyword only parameter which in my opinion is not a big issue. But that's the last I'll say on that, I don't mean to force a solution.
I do think you make an excellent point w.r.t BackgroundTasks: the last thing we want is BackgroundTasks blocking requests. So yes I think a good first step would be to give BackgroundTasks their own capacity limiter. I don't think we can use a module level CapacityLimiter, and I don't think we want one per-request cycle (otherwise threads would still grow unbounded under heavy load), so we'd somehow have to create it in the lifespan event (the only thing that happens once per application) and then share it with requests, right?
Here's an example of why I think we should let users set this value: https://github.com/harsh8398/fastapi-dependency-issue/pull/2. The specific issue is with FastAPI's dependency injection system, but the same thing would apply to slow sync endpoints.
I don't use sync dependencies like this but if I were tasked with fixing this in a real app, I would just pump up the capacity limit and hardware as needed until it's no longer the bottleneck. The easiest way to do that would be to let users set that value by passing a CapacityLimiter into the constructor for Route (and for FastAPI, into Depends or something...).
@Kludex Was this discussed again in gitter? Any links to it? I remember a few months ago this was mentioned and suggested to be left outside of Starlette.
I think this was mentioned on FastAPI issues. I don't recall an old discussion on gitter about this.
I'm going to write it here how to change the default CapacityLimiter, as it may be relevant...
Right now, you can modify the number of token_tokens on the default CapacityLimiter. Let's use the same application as described above:
import anyio
from time import sleep
from fastapi import FastAPI
from starlette.applications import Starlette
from starlette.background import BackgroundTasks
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
num = 0
def count_sleep():
global num
num += 1
print(f"Running number {num}.")
sleep(10)
def endpoint(request: Request) -> JSONResponse:
tasks = BackgroundTasks()
tasks.add_task(count_sleep)
return JSONResponse({"message": "Hello, world!"}, background=tasks)
# THIS IS THE ADDITION
async def startup():
limiter = anyio.to_thread.current_default_thread_limiter()
limiter.total_tokens = 100
app = Starlette(routes=[Route("/", endpoint)], on_startup=[startup])
You can perform the same query as mentioned:
for run in {1..100}; do
http :8000 &
done
This time, you are NOT going to have the same behavior as mentioned on:
- We can see Running number 40..
- Wait 10 seconds...
- We can see Running number 80..
- Wait 10 seconds...
- We can see Running number 100..
The behavior now is:
- We can see
Running number 100..
No waiting time.
That is indeed helpful information! I imagine for a lot of users currently experiencing issues, that's a viable short term solution 🎉!
That said, I think there are still conversation to be had because there is very real situations where you might want one thing (endpoint, FastAPI dependency, background task, etc.) to have a limited number of threads to avoid exhausting memory and another thing to have a lot more threads
Here's an example of why I think we should let users set this value: harsh8398/fastapi-dependency-issue#2. The specific issue is with FastAPI's dependency injection system, but the same thing would apply to slow sync endpoints.
I don't use sync dependencies like this but if I were tasked with fixing this in a real app, I would just pump up the capacity limit and hardware as needed until it's no longer the bottleneck. The easiest way to do that would be to let users set that value by passing a
CapacityLimiterinto the constructor forRoute(and for FastAPI, intoDependsor something...).
this is actually how the vast majority of flask applications are tuned. with the multiple different kind of workers and thread settings.
Gunicorn allows for each of the workers to have multiple threads.
gunicorn --workers=5 --threads=2 --worker-class=gthread main:app
The suggested maximum concurrent requests when using workers and threads is (2*CPU)+1
this is very application dependent and something that we have tuned over time. Having control on this is very important. its ok if you are not picking it up from the gunicorn args...but this control is definitely important.
We had a similar situation in Gradio, and resolved it via this kind of approach. Wanted to share to support the issue.
First step here would be to add some documentation.
PR welcome to document this behavior, and how to overcome it.
(I don't know where is most suitable...)
EDIT:
I don't know where is most suitable...
Either a new page, or suggest something.
Let's only document this.
I've already talked to @agronholm some weeks ago about this, and he was cool about just documenting it.
Any ideas where's the best place to document this?
I was thinking add a small snippet in https://www.starlette.io/background/ to explain the reason and how to set the total_tokens without the Starlette lifespan and tasks:
import anyio
from starlette.applications import Starlette
from starlette.background import BackgroundTasks
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
limiter = anyio.to_thread.current_default_thread_limiter()
limiter.total_tokens = 100
def do_something():
...
def endpoint(request: Request) -> JSONResponse:
tasks = BackgroundTasks()
tasks.add_task(do_something)
return JSONResponse({"message": "Hello, world!"}, background=tasks)
app = Starlette(routes=[Route("/", endpoint)])
Any ideas where's the best place to document this?
No.
I think you can suggest with a PR, and we can see if it fits.