openai-python
openai-python copied to clipboard
The concurrency of AsyncOpenAI cannot be fully utilized.
Confirm this is an issue with the Python library and not an underlying OpenAI API
- [X] This is an issue with the Python library
Describe the bug
I attempted to complete a stability test on the concurrency of AsyncOpenAI. I set the concurrency to 1024 but found that it kept running at a very low average level in a jittery manner, which has been consistent with my production test results.
To Reproduce
I put my code in three part. client.py server.py and main.py(used to create 100k client total)
server.py
from fastapi import FastAPI, Request
from pydantic import BaseModel
import asyncio
import logging
from datetime import datetime
import threading
import time
import csv
app = FastAPI()
# track current activate queue count
active_requests = 0
# debug file to draw pic
output_file = 'active_requests_log.csv'
class CompletionRequest(BaseModel):
model: str
messages: list
temperature: float
@app.middleware("http")
async def track_requests(request: Request, call_next):
global active_requests
active_requests += 1 # add count when get request
logging.info(f"Active requests: {active_requests}")
response = await call_next(request)
active_requests -= 1 # 请求完成后减少计数
logging.info(f"Active requests: {active_requests}")
return response
@app.post("/v1/chat/completions")
async def completions(request: CompletionRequest):
await asyncio.sleep(1) # mock llm generate latency
return {
"choices": [
{"message": {"content": f"Response to {request.messages[-1]['content']}"}}
]
}
def record_active_requests():
""" save log file per second"""
global active_requests
with open(output_file, mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(["timestamp", "active_requests"]) # 写表头
while True:
# 每秒记录一次
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
writer.writerow([current_time, active_requests])
file.flush() # 确保每秒写入数据到文件
time.sleep(1)
# 启动一个线程来记录活跃请求数
threading.Thread(target=record_active_requests, daemon=True).start()
if __name__ == "__main__":
import uvicorn
logging.basicConfig(level=logging.INFO)
uvicorn.run(app, host="127.0.0.1", port=8203)
client.py
import asyncio
from functools import wraps
import httpx
import logging
from openai import AsyncOpenAI
def limit_async_func_call(max_size: int):
sem = asyncio.Semaphore(max_size)
def final_decro(func):
@wraps(func)
async def wait_func(*args, **kwargs):
async with sem:
try:
return await func(*args, **kwargs)
except Exception as e:
logging.error(f"Exception in {func.__name__}: {e}")
return wait_func
return final_decro
# 假设这个是你要进行并发测试的函数
@limit_async_func_call(max_size=1024) # 限制并发为1024
async def custom_model_if_cache(prompt, system_prompt=None, history_messages=[], **kwargs):
custom_http_client = httpx.AsyncClient(
limits=httpx.Limits(max_connections=2048, max_keepalive_connections=1024),
timeout=httpx.Timeout(timeout=None)
)
openai_async_client = AsyncOpenAI(
api_key="EMPTY", base_url="http://localhost:8203/v1", # 模拟本地 server
http_client=custom_http_client
)
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.extend(history_messages)
messages.append({"role": "user", "content": prompt})
# 假设这里是要调用的外部 API
response = await openai_async_client.chat.completions.create(
model="gpt-3.5-turbo", messages=messages, temperature=0, **kwargs
)
return "hi"
main.py
import asyncio
import logging
from client import custom_model_if_cache
# 模拟 10 万个请求
TOTAL_REQUESTS = 100000
async def simulate_requests():
tasks = []
for i in range(TOTAL_REQUESTS):
prompt = f"Test prompt {i}" # 每次请求的不同参数
task = custom_model_if_cache(prompt=prompt) # 调用受限的异步函数
tasks.append(task)
# 并发执行所有请求
results = await asyncio.gather(*tasks, return_exceptions=True)
# 打印前10个结果以验证
for result in results[:10]:
print(result)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(simulate_requests())
To reproduce, open two terminal and run python server.py python main.py seperately.
I also save the log, you can use following code to draw:
draw.py
import csv
import matplotlib.pyplot as plt
from datetime import datetime
# 文件路径
input_file = 'active_requests_log.csv'
# 读取 CSV 文件并解析时间和活跃请求数量
timestamps = []
active_requests = []
with open(input_file, mode='r') as file:
reader = csv.DictReader(file)
for row in reader:
timestamps.append(datetime.strptime(row["timestamp"], "%Y-%m-%d %H:%M:%S"))
active_requests.append(int(row["active_requests"]))
# 绘制图表
plt.figure(figsize=(10, 6))
plt.plot(timestamps, active_requests, label='Active Requests', color='b')
# 设置图表标题和标签
plt.title('Active Requests Over Time')
plt.xlabel('Time')
plt.ylabel('Active Requests')
plt.xticks(rotation=45)
plt.grid(True)
plt.legend()
# 显示图表
plt.tight_layout()
plt.savefig("/mnt/rangehow/pr/test_c/c.jpg")
Code snippets
No response
OS
ubuntu
Python version
3.12
Library version
latest
Thanks for the report, what results do you get if you extract your custom_http_client & openai_async_client outside of the async function call so they're singletons?
Thanks for the report, what results do you get if you extract your
custom_http_client&openai_async_clientoutside of the async function call so they're singletons?
Do you mean this? client.py
import asyncio
from functools import wraps
import httpx
import logging
from openai import AsyncOpenAI
# 限制并发请求的装饰器
def limit_async_func_call(max_size: int):
sem = asyncio.Semaphore(max_size)
def final_decro(func):
@wraps(func)
async def wait_func(*args, **kwargs):
async with sem:
try:
return await func(*args, **kwargs)
except Exception as e:
logging.error(f"Exception in {func.__name__}: {e}")
return wait_func
return final_decro
custom_http_client = httpx.AsyncClient(
limits=httpx.Limits(max_connections=2048, max_keepalive_connections=1024),
timeout=httpx.Timeout(timeout=None)
)
openai_async_client = AsyncOpenAI(
api_key="EMPTY", base_url="http://localhost:8203/v1", # 模拟本地 server
http_client=custom_http_client
)
# 假设这个是你要进行并发测试的函数
@limit_async_func_call(max_size=1024) # 限制并发为1024
async def custom_model_if_cache(prompt, system_prompt=None, history_messages=[], **kwargs):
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.extend(history_messages)
messages.append({"role": "user", "content": prompt})
# 假设这里是要调用的外部 API
response = await openai_async_client.chat.completions.create(
model="gpt-3.5-turbo", messages=messages, temperature=0, **kwargs
)
return "hi"
yes!
I didn’t complete the entire run, but I think the result should still be the same as last time.
thanks, does this still happen if you just use httpx to make the requests instead of the openai SDK?
thanks, does this still happen if you just use
httpxto make the requests instead of theopenaiSDK?
Honestly, I don’t really understand network programming—it’s a bit beyond my skill set. If you could clearly tell me how the code should be changed (or even better, provide me with a modified version), I can quickly test it out! 😊
Although the concurrency didn’t hit the full load of 1024, it seems that the singleton operations have clearly increased the overall concurrency!
Of course! Here's what that code should look like (I haven't verified it)
http_client = httpx.AsyncClient(
limits=httpx.Limits(max_connections=2048, max_keepalive_connections=1024),
timeout=httpx.Timeout(timeout=None)
)
http_client.post(
"http://localhost:8203/v1/chat/completions",
json=dict(model="gpt-3.5-turbo", messages=messages, temperature=0, **kwargs),
)
I assume code should be like below in client.py
http_client = httpx.AsyncClient(
limits=httpx.Limits(max_connections=2048, max_keepalive_connections=1024),
timeout=httpx.Timeout(timeout=None)
)
@limit_async_func_call(max_size=1024) # 限制并发为1024
async def custom_httpx(prompt, system_prompt=None, history_messages=[], **kwargs):
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.extend(history_messages)
messages.append({"role": "user", "content": prompt})
response = await http_client.post(
"http://localhost:8203/v1/chat/completions",
json=dict(model="gpt-3.5-turbo", messages=messages, temperature=0, **kwargs),
)
return "hi"
~~The phenomenon I observed today is completely different from yesterday — whether using httpx or a singleton OpenAI API, there has been a significant drop in concurrency compared to the tests conducted yesterday.~~ I need to run for longer to get a result.
not sure if the following message would help
ss -s
Total: 9464
TCP: 13509 (estab 3444, closed 9117, orphaned 10, timewait 5130)
Transport Total IP IPv6
RAW 7 2 5
UDP 5 5 0
TCP 4392 4361 31
INET 4404 4368 36
FRAG 0 0 0
Interesting, so you're getting similar results with the SDK and with httpx?
I just tested two from scratch
OpenAI Async API:
HTTPX: (I got distracted and didn’t notice it had been running for quite a while.)
Thanks so much for the investigation here!
I'm going to close this in favour of https://github.com/openai/openai-python/issues/1596 for tracking.