Streaming Responses As Ouput Using FastAPI Support
Feature request
Right now, streaming in LLM's are can be seen in stdout in terminals but not as output responses. I'm using conversation chain where i see my output in the terminal as streaming but not while return the output through API.
Motivation
Responses may get started typing and the user may have some patience for his/her query. because if the prompt, context is lengthier there is a delay in response too. Above implementation can helps
Your contribution
🤖 AI-generated response by Steercode - chat with Langchain codebase Disclaimer: SteerCode Chat may provide inaccurate information about the Langchain codebase.
To set up a streaming response (Server-Sent Events, or SSE) with FastAPI, you can follow these steps:
- Import the required libraries:
from fastapi import FastAPI, Request, Response
from fastapi.responses import StreamingResponse
import asyncio
- Create a FastAPI instance:
Create a FastAPI instance:
- Define an async generator function that will yield data for the streaming response. In this example, we'll simulate sending messages every second:
async def stream_messages():
while True:
yield "data: Some message\n\n"
await asyncio.sleep(1)
- Create an endpoint that will return a StreamingResponse using the generator function:
@app.get("/stream")
async def stream(request: Request):
async def event_generator():
async for message in stream_messages():
if await request.is_disconnected():
break
yield message
return StreamingResponse(event_generator(), media_type="text/event-stream")
Now, when you access the /stream endpoint, you'll receive a streaming response with messages sent every second.
To integrate this with the Langchain codebase, you can modify the on_llm_end method in the AsyncIteratorCallbackHandler class. Here's an example of how you might do that:
class StreamingStdOutCallbackHandler(AsyncIteratorCallbackHandler):
async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
# Process the response object and extract the data you want to send
# For example, you could extract the generated text from the response
generated_text = "\n".join([generation.text for generations in response.generations for generation in generations])
# Send the data as a message in the SSE format
await self.queue.put(f"data: {generated_text}\n\n")
# Set the 'done' event
self.done.set()
Then, you can use this StreamingStdOutCallbackHandler class in your FastAPI endpoint to stream the LLM results.
You can continue the conversation at https://steercode.com/
🤖Steercode AI 生成的响应- 与 Langchain 代码库聊天 免责声明:SteerCode Chat 可能提供有关 Langchain 代码库的不准确信息。
要使用 FastAPI 设置流式响应(服务器发送事件或 SSE),您可以按照以下步骤操作:
- 导入所需的库:
from fastapi import FastAPI, Request, Response from fastapi.responses import StreamingResponse import asyncio
- 创建一个 FastAPI 实例:
Create a FastAPI instance:
- 定义一个异步生成器函数,它将为流响应生成数据。在这个例子中,我们将模拟每秒发送消息:
async def stream_messages(): while True: yield "data: Some message\n\n" await asyncio.sleep(1)
- 使用生成器函数创建一个将返回 StreamingResponse 的端点:
@app.get("/stream") async def stream(request: Request): async def event_generator(): async for message in stream_messages(): if await request.is_disconnected(): break yield message return StreamingResponse(event_generator(), media_type="text/event-stream")现在,当您访问 /stream 端点时,您将收到一个流式响应,其中包含每秒发送的消息。
要将其与 Langchain 代码库集成,您可以修改 AsyncIteratorCallbackHandler 类中的 on_llm_end 方法 。这是您如何执行此操作的示例:
class StreamingStdOutCallbackHandler(AsyncIteratorCallbackHandler): async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: # Process the response object and extract the data you want to send # For example, you could extract the generated text from the response generated_text = "\n".join([generation.text for generations in response.generations for generation in generations]) # Send the data as a message in the SSE format await self.queue.put(f"data: {generated_text}\n\n") # Set the 'done' event self.done.set()然后,您可以在 FastAPI 端点中使用这个 StreamingStdOutCallbackHandler 类来流式传输 LLM 结果。
Steercode is great, will it update to langchain v0.0.170?
I am following this method, and encounter another issue, when I open two client to call this endpoint in the same time, one is BLOCKED by another, and after I digging into it , found it caused by this code block result = qa({"question": question, "chat_history": chat_history}), how can I fix this?
Q: what is the BLOCKED
A: I have a client side vue project, it will call /async-ask api and display answer with typewriter effect, I open a two browsers chrome and firefox, I am first ask question in chorme, when it is answering, I ask another question in firefox, then in the mean time, the chrome side is hangs with no message receiving, until the firefox side answer finished, then it continue reveive message, this is so called BLOCKED above.
Below is my code:
@router.post("/async-ask")
async def async_ask(data: Item):
answer = _get_ask_answer(data.question, db, data.session_id)
return StreamingResponse(_generate(answer), media_type="text/event-stream")
async def _generate(answer: str):
chunk_size = 10
num_chunks = (len(answer) + chunk_size - 1) // chunk_size
for i in range(num_chunks):
chunk_start = i * chunk_size
chunk_end = (i + 1) * chunk_size
chunk = answer[chunk_start:chunk_end]
response_body = f"{chunk}"
yield response_body.encode()
await asyncio.sleep(0.1)
def _get_ask_answer(question: str, db: DeepLake, session_id: str = None):
retriever = db.as_retriever()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=4000,
chunk_overlap=200,
separators=["\\n\\n", "\n\n", ".\\n", ".\n", "\\n", "\n", " ", ""],
)
redundant_filter = EmbeddingsRedundantFilter(embeddings=embeddings)
relevant_filter = EmbeddingsFilter(embeddings=embeddings, similarity_threshold=0.76)
pipeline_compressor = DocumentCompressorPipeline(
transformers=[text_splitter, redundant_filter, relevant_filter]
)
compression_retriever = ContextualCompressionRetriever(base_compressor=pipeline_compressor, base_retriever=retriever)
model = ChatOpenAI(model_name='gpt-3.5-turbo')
qa = ConversationalRetrievalChain.from_llm(
model, retriever=compression_retriever, verbose=True)
with get_openai_callback() as cb:
# Blocked by below line, when I set the result to a static long text result = "some static long text", NO BLOCK AT ALL
result = qa({"question": question, "chat_history": []})
print(cb)
answer = result['answer']
return answer
@KevinZhang19870314 Have you try Steercode to answer your question? :)
I have pasted you question and it suggests to change the implementation of _get_ask_answer to async.
Here is the code it produced, please check if it helps
@router.post("/async-ask")
async def async_ask(data: Item):
answer = await _get_ask_answer(data.question, db, data.session_id)
return StreamingResponse(_generate(answer), media_type="text/event-stream")
async def _generate(answer: str):
chunk_size = 10
num_chunks = (len(answer) + chunk_size - 1) // chunk_size
for i in range(num_chunks):
chunk_start = i * chunk_size
chunk_end = (i + 1) * chunk_size
chunk = answer[chunk_start:chunk_end]
response_body = f"{chunk}"
yield response_body.encode()
await asyncio.sleep(0.1)
async def _get_ask_answer(question: str, db: DeepLake, session_id: str = None):
retriever = db.as_retriever()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=4000,
chunk_overlap=200,
separators=["\\n\\n", "\n\n", ".\\n", ".\n", "\\n", "\n", " ", ""],
)
redundant_filter = EmbeddingsRedundantFilter(embeddings=embeddings)
relevant_filter = EmbeddingsFilter(embeddings=embeddings, similarity_threshold=0.76)
pipeline_compressor = DocumentCompressorPipeline(
transformers=[text_splitter, redundant_filter, relevant_filter]
)
compression_retriever = ContextualCompressionRetriever(base_compressor=pipeline_compressor, base_retriever=retriever)
model = ChatOpenAI(model_name='gpt-3.5-turbo')
qa = ConversationalRetrievalChain.from_llm(
model, retriever=compression_retriever, verbose=True)
async with get_openai_callback() as cb:
result = await qa._acall({"question": question, "chat_history": []})
print(cb)
answer = result['answer']
return answer
I am trying to implement streaming response according to the SteerCode implementation, but it is not working yet. It still waits the whole answer and returns it as one. Can you help me with it?
The endpoint:
router = APIRouter()
chat_controller = ChatController()
@router.get("/ask", tags=["user", "chat"])
async def ask(prompt: str, id: str):
async def event_generator():
async for message in chat_controller.askAI(prompt=prompt, id=id):
yield message
return StreamingResponse(event_generator(), media_type="text/event-stream")
The controller:
class ChatController(object):
def __init__():
self.agent = _create_chat_agent()
def _create_chat_agent(self):
self.llm = OpenAI(temperature=0, model_name="gpt-4", top_p=0.2, presence_penalty=0.4,
frequency_penalty=0.2, streaming=True, callbacks=[StreamingStdOutCallbackHandler()])
vectorstore = self._load_indeces()
# Initialise Langchain - QA chain
qa = RetrievalQA.from_chain_type(llm=self.llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(),
chain_type_kwargs=chain_type_kwargs)
tools = [
Tool(
name="...",
func=qa.run,
description="..."
),
]
system_msg = '''You are a helpful assistant.'''
agent = ConversationalChatAgent.from_llm_and_tools(
llm=self.llm,
tools=tools,
system_message=system_msg
)
self.chat_agent = AgentExecutor.from_agent_and_tools(
agent=agent, tools=tools, verbose=False, memory=memory
)
def askAI(self, prompt: str, id: str):
return self.chat_agent.run(input=prompt)
you probably need to make the askAI method async and change the run to arun and await it. Also you will need to await the askAI in the for loop in ask endpoint.
Thank you for your response!
Unfortunately, it throws this error: TypeError: 'async for' requires an object with __aiter__ method, got str to this line: async for message in await chat_controller.askAI(prompt=prompt, id=id):
Thank you for your response! Unfortunately, it throws this error:
TypeError: 'async for' requires an object with __aiter__ method, got strto this line:async for message in await chat_controller.askAI(prompt=prompt, id=id):
the chat_controller doesn't have the aiter method. you need to create the handler first
handler = StreamingStdOutCallbackHandler()
set the handler as the callback (for the model or the arun call)
then use the handler async iterator
async for message in handler.aiter():
yield message
Thank you for your answer, I edit my code with your answer, I got below error, can you help?
ERROR: Exception in ASGI application
Traceback (most recent call last):
File "D:\deep-ai\venv\lib\site-packages\uvicorn\protocols\http\httptools_impl.py", line 435, in run_asgi
result = await app( # type: ignore[func-returns-value]
File "D:\deep-ai\venv\lib\site-packages\uvicorn\middleware\proxy_headers.py", line 78, in __call__
return await self.app(scope, receive, send)
File "D:\deep-ai\venv\lib\site-packages\fastapi\applications.py", line 276, in __call__
await super().__call__(scope, receive, send)
File "D:\deep-ai\venv\lib\site-packages\starlette\applications.py", line 122, in __call__
await self.middleware_stack(scope, receive, send)
File "D:\deep-ai\venv\lib\site-packages\starlette\middleware\errors.py", line 184, in __call__
raise exc
File "D:\deep-ai\venv\lib\site-packages\starlette\middleware\errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "D:\deep-ai\venv\lib\site-packages\starlette\middleware\exceptions.py", line 79, in __call__
raise exc
File "D:\deep-ai\venv\lib\site-packages\starlette\middleware\exceptions.py", line 68, in __call__
await self.app(scope, receive, sender)
File "D:\deep-ai\venv\lib\site-packages\fastapi\middleware\asyncexitstack.py", line 21, in __call__
raise e
File "D:\deep-ai\venv\lib\site-packages\fastapi\middleware\asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "D:\deep-ai\venv\lib\site-packages\starlette\routing.py", line 718, in __call__
await route.handle(scope, receive, send)
File "D:\deep-ai\venv\lib\site-packages\starlette\routing.py", line 276, in handle
await self.app(scope, receive, send)
File "D:\deep-ai\venv\lib\site-packages\starlette\routing.py", line 66, in app
response = await func(request)
File "D:\deep-ai\venv\lib\site-packages\fastapi\routing.py", line 237, in app
raw_response = await run_endpoint_function(
File "D:\deep-ai\venv\lib\site-packages\fastapi\routing.py", line 163, in run_endpoint_function
return await dependant.call(**values)
File "D:\deep-ai\main.py", line 87, in async_ask
answer = await _get_ask_answer(question, db_read, session_id)
File "D:\deep-ai\main.py", line 150, in _get_ask_answer
result = await qa._acall({"question": question, "chat_history": chat_history})
File "D:\deep-ai\venv\lib\site-packages\langchain\chains\conversational_retrieval\base.py", line 138, in _acall
docs = await self._aget_docs(new_question, inputs)
File "D:\deep-ai\venv\lib\site-packages\langchain\chains\conversational_retrieval\base.py", line 187, in _aget_docs
docs = await self.retriever.aget_relevant_documents(question)
File "D:\deep-ai\venv\lib\site-packages\langchain\retrievers\contextual_compression.py", line 50, in aget_relevant_documents
compressed_docs = await self.base_compressor.acompress_documents(docs, query)
File "D:\deep-ai\venv\lib\site-packages\langchain\retrievers\document_compressors\base.py", line 58, in acompress_documents
documents = await _transformer.atransform_documents(documents)
File "D:\deep-ai\venv\lib\site-packages\langchain\text_splitter.py", line 198, in atransform_documents
raise NotImplementedError
NotImplementedError
Also, when I use acall, the verbose log disappear, even I set verbose to True.
qa = ConversationalRetrievalChain.from_llm(model, retriever=retriever, verbose=True)
My langchain version is langchain==0.0.171
Finally, I tried many times, there is no lucky for acall, it seems the output in the console blocked the client side, client side will wait until the server side console output finished. It means that until one call finished by output in the console with Finished chain, then another call will be unblocked.
Hey everyone! I wanted to share a helpful resource with you. It's an alternative approach for making streaming work with FastAPI. It has advantages and limitations, but it could benefit some of you. That's why I decided to share it. Check it out here: https://gist.github.com/jvelezmagic/03ddf4c452d011aae36b2a0f73d72f68
Feel free to try it, and let me know what you think!
There is also this library, I have ended up using.
https://github.com/ajndkr/lanarky
Hey everyone! I wanted to share a helpful resource with you. It's an alternative approach for making streaming work with FastAPI. It has advantages and limitations, but it could benefit some of you. That's why I decided to share it. Check it out here: https://gist.github.com/jvelezmagic/03ddf4c452d011aae36b2a0f73d72f68
Feel free to try it, and let me know what you think!
What are the limitations? @jvelezmagic
I want to manipulate the chunks directly in-flight, such as gather sentences and send them to a downstream async process.
I don't see langchain current supporting this - as in, it does not return the generator but rather iterates on it inside the completion method, such as streaming to STDOUT.
I have hacked the internals of langchain a bit in my own project to return the generator.
if people find this useful, perhaps we can add return_generator_bool flag to the streaming of the output to give more freedom on working with the streaming chunks....
https://github.com/filip-michalsky/SalesGPT/blob/streaming_generator/sales_gpt.py#L203
I have an example Replit that you might find useful here: https://replit.com/@marko911/PoshPeskyBackground
Hi, @hussamsayeed! I'm Dosu, and I'm helping the LangChain team manage their backlog. I wanted to let you know that we are marking this issue as stale.
From what I understand, this issue is a feature request to enable streaming responses as output in FastAPI. There have been some interesting discussions and suggestions in the comments. One user provided a solution using the StreamingResponse class and async generator functions, which seems to have resolved the issue. There were also discussions about implementing streaming in the ChatController class and resolving errors related to async iteration. Some additional resources and libraries, such as a Gist and the Lanarky library, were shared. One user even mentioned modifying the langchain library to return a generator for more flexibility in working with streaming chunks.
Before we close this issue, we wanted to check with you if it is still relevant to the latest version of the LangChain repository. If it is, please let us know by commenting on the issue. Otherwise, feel free to close the issue yourself, or it will be automatically closed in 7 days.
Thank you for your contribution to the LangChain repository! Let us know if you have any further questions or concerns.