langchain icon indicating copy to clipboard operation
langchain copied to clipboard

Streaming Responses As Ouput Using FastAPI Support

Open hussamsayeed opened this issue 2 years ago • 14 comments

Feature request

Right now, streaming in LLM's are can be seen in stdout in terminals but not as output responses. I'm using conversation chain where i see my output in the terminal as streaming but not while return the output through API.

Motivation

Responses may get started typing and the user may have some patience for his/her query. because if the prompt, context is lengthier there is a delay in response too. Above implementation can helps

Your contribution

hussamsayeed avatar May 15 '23 06:05 hussamsayeed

🤖 AI-generated response by Steercode - chat with Langchain codebase Disclaimer: SteerCode Chat may provide inaccurate information about the Langchain codebase.

To set up a streaming response (Server-Sent Events, or SSE) with FastAPI, you can follow these steps:

  1. Import the required libraries:
from fastapi import FastAPI, Request, Response
from fastapi.responses import StreamingResponse
import asyncio
  1. Create a FastAPI instance:
Create a FastAPI instance:
  1. Define an async generator function that will yield data for the streaming response. In this example, we'll simulate sending messages every second:
async def stream_messages():
    while True:
        yield "data: Some message\n\n"
        await asyncio.sleep(1)
  1. Create an endpoint that will return a StreamingResponse using the generator function:
@app.get("/stream")
async def stream(request: Request):
    async def event_generator():
        async for message in stream_messages():
            if await request.is_disconnected():
                break
            yield message

    return StreamingResponse(event_generator(), media_type="text/event-stream")

Now, when you access the /stream endpoint, you'll receive a streaming response with messages sent every second.

To integrate this with the Langchain codebase, you can modify the on_llm_end method in the AsyncIteratorCallbackHandler class. Here's an example of how you might do that:

class StreamingStdOutCallbackHandler(AsyncIteratorCallbackHandler):
    async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        # Process the response object and extract the data you want to send
        # For example, you could extract the generated text from the response
        generated_text = "\n".join([generation.text for generations in response.generations for generation in generations])

        # Send the data as a message in the SSE format
        await self.queue.put(f"data: {generated_text}\n\n")

        # Set the 'done' event
        self.done.set()

Then, you can use this StreamingStdOutCallbackHandler class in your FastAPI endpoint to stream the LLM results.

You can continue the conversation at https://steercode.com/

votrumar avatar May 15 '23 12:05 votrumar

🤖Steercode AI 生成的响应- 与 Langchain 代码库聊天 免责声明:SteerCode Chat 可能提供有关 Langchain 代码库的不准确信息。

要使用 FastAPI 设置流式响应(服务器发送事件或 SSE),您可以按照以下步骤操作:

  1. 导入所需的库:
from fastapi import FastAPI, Request, Response
from fastapi.responses import StreamingResponse
import asyncio
  1. 创建一个 FastAPI 实例:
Create a FastAPI instance:
  1. 定义一个异步生成器函数,它将为流响应生成数据。在这个例子中,我们将模拟每秒发送消息:
async def stream_messages():
    while True:
        yield "data: Some message\n\n"
        await asyncio.sleep(1)
  1. 使用生成器函数创建一个将返回 StreamingResponse 的端点:
@app.get("/stream")
async def stream(request: Request):
    async def event_generator():
        async for message in stream_messages():
            if await request.is_disconnected():
                break
            yield message

    return StreamingResponse(event_generator(), media_type="text/event-stream")

现在,当您访问 /stream 端点时,您将收到一个流式响应,其中包含每秒发送的消息。

要将其与 Langchain 代码库集成,您可以修改 AsyncIteratorCallbackHandler 类中的 on_llm_end 方法 。这是您如何执行此操作的示例:

class StreamingStdOutCallbackHandler(AsyncIteratorCallbackHandler):
    async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        # Process the response object and extract the data you want to send
        # For example, you could extract the generated text from the response
        generated_text = "\n".join([generation.text for generations in response.generations for generation in generations])

        # Send the data as a message in the SSE format
        await self.queue.put(f"data: {generated_text}\n\n")

        # Set the 'done' event
        self.done.set()

然后,您可以在 FastAPI 端点中使用这个 StreamingStdOutCallbackHandler 类来流式传输 LLM 结果。

您可以在https://steercode.com/继续对话

Steercode is great, will it update to langchain v0.0.170?

gaye746560359 avatar May 16 '23 06:05 gaye746560359

I am following this method, and encounter another issue, when I open two client to call this endpoint in the same time, one is BLOCKED by another, and after I digging into it , found it caused by this code block result = qa({"question": question, "chat_history": chat_history}), how can I fix this?

Q: what is the BLOCKED A: I have a client side vue project, it will call /async-ask api and display answer with typewriter effect, I open a two browsers chrome and firefox, I am first ask question in chorme, when it is answering, I ask another question in firefox, then in the mean time, the chrome side is hangs with no message receiving, until the firefox side answer finished, then it continue reveive message, this is so called BLOCKED above.

Below is my code:

@router.post("/async-ask")
async def async_ask(data: Item):
	answer = _get_ask_answer(data.question, db, data.session_id)
	return StreamingResponse(_generate(answer), media_type="text/event-stream")


async def _generate(answer: str):
	chunk_size = 10
	num_chunks = (len(answer) + chunk_size - 1) // chunk_size
	for i in range(num_chunks):
		chunk_start = i * chunk_size
		chunk_end = (i + 1) * chunk_size
		chunk = answer[chunk_start:chunk_end]
		response_body = f"{chunk}"
		yield response_body.encode()
		await asyncio.sleep(0.1)


def _get_ask_answer(question: str, db: DeepLake, session_id: str = None):
	retriever = db.as_retriever()
	text_splitter = RecursiveCharacterTextSplitter(
		chunk_size=4000,
		chunk_overlap=200,
		separators=["\\n\\n", "\n\n", ".\\n", ".\n", "\\n", "\n", " ", ""],
	)
	redundant_filter = EmbeddingsRedundantFilter(embeddings=embeddings)
	relevant_filter = EmbeddingsFilter(embeddings=embeddings, similarity_threshold=0.76)
	pipeline_compressor = DocumentCompressorPipeline(
		transformers=[text_splitter, redundant_filter, relevant_filter]
	)
	compression_retriever = ContextualCompressionRetriever(base_compressor=pipeline_compressor, base_retriever=retriever)

	model = ChatOpenAI(model_name='gpt-3.5-turbo')
	qa = ConversationalRetrievalChain.from_llm(
		model, retriever=compression_retriever, verbose=True)
	with get_openai_callback() as cb:
                # Blocked by below line, when I set the result to a static long text result = "some static long text", NO BLOCK AT ALL
		result = qa({"question": question, "chat_history": []})
		print(cb)

	answer = result['answer']

	return answer

KevinZhang19870314 avatar May 16 '23 07:05 KevinZhang19870314

@KevinZhang19870314 Have you try Steercode to answer your question? :)

I have pasted you question and it suggests to change the implementation of _get_ask_answer to async.

Here is the code it produced, please check if it helps

@router.post("/async-ask")
async def async_ask(data: Item):
    answer = await _get_ask_answer(data.question, db, data.session_id)
    return StreamingResponse(_generate(answer), media_type="text/event-stream")


async def _generate(answer: str):
    chunk_size = 10
    num_chunks = (len(answer) + chunk_size - 1) // chunk_size
    for i in range(num_chunks):
        chunk_start = i * chunk_size
        chunk_end = (i + 1) * chunk_size
        chunk = answer[chunk_start:chunk_end]
        response_body = f"{chunk}"
        yield response_body.encode()
        await asyncio.sleep(0.1)


async def _get_ask_answer(question: str, db: DeepLake, session_id: str = None):
    retriever = db.as_retriever()
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=4000,
        chunk_overlap=200,
        separators=["\\n\\n", "\n\n", ".\\n", ".\n", "\\n", "\n", " ", ""],
    )
    redundant_filter = EmbeddingsRedundantFilter(embeddings=embeddings)
    relevant_filter = EmbeddingsFilter(embeddings=embeddings, similarity_threshold=0.76)
    pipeline_compressor = DocumentCompressorPipeline(
        transformers=[text_splitter, redundant_filter, relevant_filter]
    )
    compression_retriever = ContextualCompressionRetriever(base_compressor=pipeline_compressor, base_retriever=retriever)

    model = ChatOpenAI(model_name='gpt-3.5-turbo')
    qa = ConversationalRetrievalChain.from_llm(
        model, retriever=compression_retriever, verbose=True)
    async with get_openai_callback() as cb:
        result = await qa._acall({"question": question, "chat_history": []})
        print(cb)

    answer = result['answer']

    return answer

votrumar avatar May 16 '23 07:05 votrumar

I am trying to implement streaming response according to the SteerCode implementation, but it is not working yet. It still waits the whole answer and returns it as one. Can you help me with it?

The endpoint:

router = APIRouter()
chat_controller = ChatController()

@router.get("/ask",  tags=["user", "chat"])
async def ask(prompt: str, id: str):
    async def event_generator():
        async for message in chat_controller.askAI(prompt=prompt, id=id):
            yield message

    return StreamingResponse(event_generator(), media_type="text/event-stream")

The controller:


class ChatController(object):
    def __init__():
        self.agent = _create_chat_agent()

    def _create_chat_agent(self):

        self.llm = OpenAI(temperature=0, model_name="gpt-4", top_p=0.2, presence_penalty=0.4,
                          frequency_penalty=0.2, streaming=True, callbacks=[StreamingStdOutCallbackHandler()])

        vectorstore = self._load_indeces()

        # Initialise Langchain - QA chain
        qa = RetrievalQA.from_chain_type(llm=self.llm,
                                         chain_type="stuff",
                                         retriever=vectorstore.as_retriever(),
                                         chain_type_kwargs=chain_type_kwargs)

        tools = [
            Tool(
                name="...",
                func=qa.run,
                description="..."
            ),
        ]

        system_msg = '''You are a helpful assistant.'''

        agent = ConversationalChatAgent.from_llm_and_tools(
            llm=self.llm,
            tools=tools,
            system_message=system_msg
        )

        self.chat_agent = AgentExecutor.from_agent_and_tools(
            agent=agent, tools=tools, verbose=False, memory=memory
        )
    

    def askAI(self, prompt: str, id: str):

        return self.chat_agent.run(input=prompt)
        

pelyhe avatar May 16 '23 11:05 pelyhe

you probably need to make the askAI method async and change the run to arun and await it. Also you will need to await the askAI in the for loop in ask endpoint.

votrumar avatar May 16 '23 12:05 votrumar

Thank you for your response! Unfortunately, it throws this error: TypeError: 'async for' requires an object with __aiter__ method, got str to this line: async for message in await chat_controller.askAI(prompt=prompt, id=id):

pelyhe avatar May 16 '23 12:05 pelyhe

Thank you for your response! Unfortunately, it throws this error: TypeError: 'async for' requires an object with __aiter__ method, got str to this line: async for message in await chat_controller.askAI(prompt=prompt, id=id):

the chat_controller doesn't have the aiter method. you need to create the handler first

handler = StreamingStdOutCallbackHandler()

set the handler as the callback (for the model or the arun call)

then use the handler async iterator

async for message in handler.aiter():
   yield message

khu834 avatar May 16 '23 18:05 khu834

Thank you for your answer, I edit my code with your answer, I got below error, can you help?

ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "D:\deep-ai\venv\lib\site-packages\uvicorn\protocols\http\httptools_impl.py", line 435, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
  File "D:\deep-ai\venv\lib\site-packages\uvicorn\middleware\proxy_headers.py", line 78, in __call__
    return await self.app(scope, receive, send)
  File "D:\deep-ai\venv\lib\site-packages\fastapi\applications.py", line 276, in __call__
    await super().__call__(scope, receive, send)
  File "D:\deep-ai\venv\lib\site-packages\starlette\applications.py", line 122, in __call__
    await self.middleware_stack(scope, receive, send)
  File "D:\deep-ai\venv\lib\site-packages\starlette\middleware\errors.py", line 184, in __call__
    raise exc
  File "D:\deep-ai\venv\lib\site-packages\starlette\middleware\errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "D:\deep-ai\venv\lib\site-packages\starlette\middleware\exceptions.py", line 79, in __call__
    raise exc
  File "D:\deep-ai\venv\lib\site-packages\starlette\middleware\exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File "D:\deep-ai\venv\lib\site-packages\fastapi\middleware\asyncexitstack.py", line 21, in __call__
    raise e
  File "D:\deep-ai\venv\lib\site-packages\fastapi\middleware\asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "D:\deep-ai\venv\lib\site-packages\starlette\routing.py", line 718, in __call__
    await route.handle(scope, receive, send)
  File "D:\deep-ai\venv\lib\site-packages\starlette\routing.py", line 276, in handle
    await self.app(scope, receive, send)
  File "D:\deep-ai\venv\lib\site-packages\starlette\routing.py", line 66, in app
    response = await func(request)
  File "D:\deep-ai\venv\lib\site-packages\fastapi\routing.py", line 237, in app
    raw_response = await run_endpoint_function(
  File "D:\deep-ai\venv\lib\site-packages\fastapi\routing.py", line 163, in run_endpoint_function
    return await dependant.call(**values)
  File "D:\deep-ai\main.py", line 87, in async_ask
    answer = await _get_ask_answer(question, db_read, session_id)
  File "D:\deep-ai\main.py", line 150, in _get_ask_answer
    result = await qa._acall({"question": question, "chat_history": chat_history})
  File "D:\deep-ai\venv\lib\site-packages\langchain\chains\conversational_retrieval\base.py", line 138, in _acall
    docs = await self._aget_docs(new_question, inputs)
  File "D:\deep-ai\venv\lib\site-packages\langchain\chains\conversational_retrieval\base.py", line 187, in _aget_docs
    docs = await self.retriever.aget_relevant_documents(question)
  File "D:\deep-ai\venv\lib\site-packages\langchain\retrievers\contextual_compression.py", line 50, in aget_relevant_documents
    compressed_docs = await self.base_compressor.acompress_documents(docs, query)
  File "D:\deep-ai\venv\lib\site-packages\langchain\retrievers\document_compressors\base.py", line 58, in acompress_documents
    documents = await _transformer.atransform_documents(documents)
  File "D:\deep-ai\venv\lib\site-packages\langchain\text_splitter.py", line 198, in atransform_documents
    raise NotImplementedError
NotImplementedError

KevinZhang19870314 avatar May 17 '23 01:05 KevinZhang19870314

Also, when I use acall, the verbose log disappear, even I set verbose to True.

qa = ConversationalRetrievalChain.from_llm(model, retriever=retriever, verbose=True)

My langchain version is langchain==0.0.171

Finally, I tried many times, there is no lucky for acall, it seems the output in the console blocked the client side, client side will wait until the server side console output finished. It means that until one call finished by output in the console with Finished chain, then another call will be unblocked.

KevinZhang19870314 avatar May 17 '23 01:05 KevinZhang19870314

Hey everyone! I wanted to share a helpful resource with you. It's an alternative approach for making streaming work with FastAPI. It has advantages and limitations, but it could benefit some of you. That's why I decided to share it. Check it out here: https://gist.github.com/jvelezmagic/03ddf4c452d011aae36b2a0f73d72f68

Feel free to try it, and let me know what you think!

jvelezmagic avatar May 17 '23 12:05 jvelezmagic

There is also this library, I have ended up using.

https://github.com/ajndkr/lanarky

gabriel-ecegi avatar May 18 '23 16:05 gabriel-ecegi

Hey everyone! I wanted to share a helpful resource with you. It's an alternative approach for making streaming work with FastAPI. It has advantages and limitations, but it could benefit some of you. That's why I decided to share it. Check it out here: https://gist.github.com/jvelezmagic/03ddf4c452d011aae36b2a0f73d72f68

Feel free to try it, and let me know what you think!

What are the limitations? @jvelezmagic

AmritaAIE avatar May 20 '23 12:05 AmritaAIE

I want to manipulate the chunks directly in-flight, such as gather sentences and send them to a downstream async process.

I don't see langchain current supporting this - as in, it does not return the generator but rather iterates on it inside the completion method, such as streaming to STDOUT.

I have hacked the internals of langchain a bit in my own project to return the generator.

if people find this useful, perhaps we can add return_generator_bool flag to the streaming of the output to give more freedom on working with the streaming chunks....

https://github.com/filip-michalsky/SalesGPT/blob/streaming_generator/sales_gpt.py#L203

filip-michalsky avatar May 22 '23 06:05 filip-michalsky

I have an example Replit that you might find useful here: https://replit.com/@marko911/PoshPeskyBackground

marko911 avatar Jun 24 '23 11:06 marko911

Hi, @hussamsayeed! I'm Dosu, and I'm helping the LangChain team manage their backlog. I wanted to let you know that we are marking this issue as stale.

From what I understand, this issue is a feature request to enable streaming responses as output in FastAPI. There have been some interesting discussions and suggestions in the comments. One user provided a solution using the StreamingResponse class and async generator functions, which seems to have resolved the issue. There were also discussions about implementing streaming in the ChatController class and resolving errors related to async iteration. Some additional resources and libraries, such as a Gist and the Lanarky library, were shared. One user even mentioned modifying the langchain library to return a generator for more flexibility in working with streaming chunks.

Before we close this issue, we wanted to check with you if it is still relevant to the latest version of the LangChain repository. If it is, please let us know by commenting on the issue. Otherwise, feel free to close the issue yourself, or it will be automatically closed in 7 days.

Thank you for your contribution to the LangChain repository! Let us know if you have any further questions or concerns.

dosubot[bot] avatar Sep 23 '23 16:09 dosubot[bot]