fastapi
fastapi copied to clipboard
Problem with Concurrency Request: High Response time
First Check
- [X] I added a very descriptive title to this issue.
- [X] I used the GitHub search to find a similar issue and didn't find it.
- [X] I searched the FastAPI documentation, with the integrated search.
- [X] I already searched in Google "How to X in FastAPI" and didn't find any information.
- [X] I already read and followed all the tutorial in the docs and didn't find an answer.
- [X] I already checked if it is not related to FastAPI but to Pydantic.
- [X] I already checked if it is not related to FastAPI but to Swagger UI.
- [X] I already checked if it is not related to FastAPI but to ReDoc.
Commit to Help
- [X] I commit to help with one of those options 👆
Example Code
main.py:
import os
import sys
import json
import time
from lib.UserID import UserID
from model.ModelUserToken import ModelUserToken
from fastapi import FastAPI, Depends, Body, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.concurrency import run_in_threadpool
from pydantic import BaseModel
from prometheus_fastapi_instrumentator import Instrumentator
import resource
import uuid
import loguru
#Load the key of the tenant
with open(os.path.join(sys.path[0], "/app/data/key.json"), "r") as f:
data = json.load(f)
kid_crm = data['CRM']['credentialId']
signature_base64_crm = data['CRM']['signingKey']
key_base64_crm = data['CRM']['encryptionKey']
app = FastAPI()
logger = loguru.logger
logger.remove()
logger.add(sys.stdout, format="{time} - {level} - ({extra[request_id]}) {message} ", level="DEBUG")
origins = [
'*'
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=False,
allow_methods=['GET','POST'],
allow_headers=["*"]
)
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
request_id = str(uuid.uuid4())
with logger.contextualize(request_id=request_id):
logger.info("Request started")
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
logger.info("Request ended in " + str(process_time))
response.headers["X-Request-ID"] = request_id
response.headers["X-Process-Time"] = str(format(process_time))
return response
class Response(BaseModel):
token: str
result = {}
@app.get("/", summary="Test the availability of the service")
async def read_main():
return {"msg": "I'm available!"}
@app.get("/token/UserID", tags=["UserID"], summary="Generate token", response_model=Response)
async def token(args: dict = Depends(ModelUserToken)):
token = UserID(kid_crm, signature_base64_crm)
result['token'] = await token.Create(args)
return result
@app.on_event("startup")
async def startup():
resource.setrlimit(resource.RLIMIT_NOFILE, (65536, 65536))
Instrumentator(excluded_handlers=["/metrics", "/docs", "/openapi.json"]).instrument(app).expose(app, include_in_schema=False)
Description
Hello,
I'm facing an issue when I'm testing my fastAPI service simulating concurrenty requests (for example 1000 req/s). The issue observed is on high response time, around 3s when normally the service is answering in less 0.025s. The service is quite simple: it should generate a AWS token. So, I'm suspecting that I don't have define some fuctions in a proper way. The service is deployed in a docker image, I'm using the tiangolo uvicorn-gunicorn-fastapi docker python 3.9. The file I posted it is the main.py which is inside the directory app. The app directory has the following structure: the directory model/ contains the file ModelUserToken.py:
from typing import Optional
from fastapi import Query
def ModelUserToken(
userID:
Optional[str] = Query(
None,
description="Public userID",
min_length=3,
),
exp: Optional[int] = Query(
None,
description="Expiration of the token. Default NONE (no expiration). Max value is 90 seconds",
gt=0,
lte=90,
)):
return {"exp":exp, "userID": userID }
the directory lib/ has the files: UserID.py
import time
from .tools import encryption, remove_none_values
from fastapi.concurrency import run_in_threadpool
class UserID(object):
def __init__(self, kid, signature_base64):
self.paylod = {}
self.kid = kid
self.signature_base64 = signature_base64
async def Create(self, input):
#Check if the token has an expiration time
if input["exp"] == None:
self.payload = {
"typ": "UserID",
"ver": "1.0"
}
else:
current_seconds = round(time.time())
exp_token = current_seconds + input["exp"]
self.payload = {
"ver": "1.0",
"typ": "UserID",
"exp": exp_token
}
#remove the value exp
del input['exp']
clean_input = await run_in_threadpool(remove_none_values,input)
#Update the self.payload if the clean_input exists
if clean_input is not None:
self.payload.update(clean_input)
token = await run_in_threadpool(encryption, self.payload, self.signature_base64, self.kid)
return token
the file tools.py
import jwt
import base64
def encryption(payload, signature, kid):
#Generate the encrypted token
encrypt = jwt.encode(
payload,
base64.b64decode(signature),
algorithm="HS256",
headers={"kid": kid}
)
return encrypt
def remove_none_values(input):
"""
Given a dictionary, dict, remove None values
If the dictionary includes nested dictionaries, investigate and remove None values there too.
The same is done if the dictionary includes nested lists
"""
cleaned_dict = {}
for key, value in input.items():
# print(type(value),key)
if isinstance(value, dict):
nested_dict = remove_none_values(value)
if len(nested_dict.keys()) > 0:
cleaned_dict[key] = nested_dict
elif isinstance(value,list):
NewList=[]
for dict_values in value:
result = {key: value for key, value in dict_values.items() if value is not None}
NewList.append(result)
cleaned_dict[key] = NewList
elif value is not None:
cleaned_dict[key] = value
return cleaned_dict
Then, I'm using this script to test the performance with 1000 req/s
import random
import asyncio
import httpx
import json
async def request():
async with httpx.AsyncClient() as client:
r = await client.get("http://10.20.21.21:9080/token/UserID")
request_id = r.headers.get("x-request-id")
status_code = r.status_code
print(f"Received response {status_code} for request_id={request_id}")
async def run():
await asyncio.gather(*[request() for _ in range(1000)])
if __name__ == "__main__":
asyncio.run(run())
During the execution, I'm observing that all requests have a time response between 1s and 2.5s. Could you help me where the problem could be ?
Thanks in advance, PAz
Operating System
Linux
Operating System Details
Docker Image uvicorn-gunicorn-fastapi with gunicorn 20.1.0 and 8 workers
FastAPI Version
0.88.0
Python Version
3.9.16
Additional Context
No response
A few of questions
- 1000 reqs/s can be a lot or a little depending on the hardware you are running
- Are you running your bench mark on the same hardware as your server
- You are using logru which I dont know much about but make sure it doesnt block the event loop (I would remove it entirely to see if that is the issue, logging can slow things down a lot if not done correctly)
Hi @jgould22
The test script is executed from another server. The service fastApi is executed in a dedicated server. logru was introduced after to debug a little what was happening. I don’t think it is introducing any blocking point, the issue was present also before.
Thanks Paz
Hi @Pazzeo
Did you check around run_in_threadpool ? Openning thread is slow and too much thread can lead to a crash. You should try to look this into that.
Can you try the same with sync route and see if same issue exist? Usually I face such issues when I mix Sync and Async functions
Did you check around run_in_threadpool ? Openning thread is slow and too much thread can lead to a crash.
Also very likely the default any io Capacity limiter is 40 iirc.
Here is the call https://github.com/encode/starlette/blob/master/starlette/concurrency.py#L35 and the docs indicate that if it is None https://anyio.readthedocs.io/en/stable/api.html#anyio.to_thread.run_sync then the default is used.
Hi @iudeen , @jgould22
First, thanks a lot for the help. In these days, I have tried what you proposed, but I don't observe too much improvements. Currently, I'm reaching the best performance without using the await run_in_threadpool and removing the await in the calling of token.Create. I have just left the async in the def token. With this setup, I have always response time between the 1 and the 1.5 seconds. I'm not able to understand where is the bottleneck of this code. The gunicorn is running with 8 workers and during my tests I've monitored the processes and the workload. It seems that the job is properly distributed bitween the workers: the consumption of cpu is well distributes between the pids.
I don't know how I can improve my code to have better response time with concurrent requests. Do you have any suggestions?
Paz
- There's no reason to use run_in_threadpool on the snippet in the description.
- There's a bit drop of performance on the middleware, since it's not pure ASGI middleware, but not relevant.
The problem on your benchmark is on the client side. Use a single AsyncClient and pass it to the task that is going to run multiple times.
Hi @Kludex
Indeed, I have removed the run_in_threadpool and the service is working better. About the client side, how do you simulate the 1000 req/s? I though that the implementation was correct
Paz
@Pazzeo
in your code you create 1000 instances of the httpx.AsyncClient, but you should create one and let that one make the 1000 requests like this
import random
import asyncio
import httpx
import json
async def request(client: httpx.AsyncClient):
r = await client.get("http://10.20.21.21:9080/token/UserID")
request_id = r.headers.get("x-request-id")
status_code = r.status_code
print(f"Received response {status_code} for request_id={request_id}")
async def run():
client = httpx.AsyncClient()
await asyncio.gather(*[request(client) for _ in range(1000)])
if __name__ == "__main__":
asyncio.run(run())
a full example of how using and TEST a fastapi endpoint with a httpx client -> https://github.com/raphaelauv/fastAPI-httpx-example