langchain icon indicating copy to clipboard operation
langchain copied to clipboard

Final answer streaming problem

Open poojitharamachandra opened this issue 2 years ago • 20 comments

System Info

hi, I am unable to stream the final answer from llm chain to chianlit UI. langchain==0.0.218 Python 3.9.16

here are the details: https://github.com/Chainlit/chainlit/issues/313

is this implemented? - https://github.com/langchain-ai/langchain/pull/1222/

Who can help?

No response

Information

  • [ ] The official example notebooks/scripts
  • [ ] My own modified scripts

Related Components

  • [X] LLMs/Chat Models
  • [ ] Embedding Models
  • [ ] Prompts / Prompt Templates / Prompt Selectors
  • [ ] Output Parsers
  • [ ] Document Loaders
  • [ ] Vector Stores / Retrievers
  • [ ] Memory
  • [ ] Agents / Agent Executors
  • [ ] Tools / Toolkits
  • [X] Chains
  • [X] Callbacks/Tracing
  • [ ] Async

Reproduction

the code given above can reproduce the behaviour

Expected behavior

expected behaviour : the final answer from llm chain to be streamed properly to chianlit UI

poojitharamachandra avatar Sep 07 '23 07:09 poojitharamachandra

🤖

Hi,

Based on the information you've provided, it seems like you're having trouble streaming the final answer from the LLM chain to the Chainlit UI.

From the similar issues that have been solved in the LangChain repository, there are a few things you could try:

  1. Change the parameter from stream to streaming in the ChatOpenAI instantiation:
llm = ChatOpenAI(temperature = 0, model="gpt-3.5-turbo-0613", streaming=True)

This was the solution suggested in the issue OpenAIFunctionsAgent | Streaming Bug.

  1. If you're using the GPT4All model, you need to set streaming = True in the constructor. This was the solution suggested in the issue Streaming does not work using streaming callbacks for gpt4all model.

  2. When you instantiate your LLMchain, set verbose=False. However, when you define your model, you need to set verbose=True. This was the solution suggested in the issue Support for streaming when using LLMchain?.

# Create chain
# this will only stream response not the thought process
ground_truth_chain = LLMChain(llm=LLM, prompt=ground_truth_prompt, verbose=False) 

# Define model
LLM = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0.1, openai_api_key=OPENAI_KEY, streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]))

As for your question about the pull request https://github.com/langchain-ai/langchain/pull/1222/, I'm unable to view the link. However, you can check the status of the pull request by visiting the link directly. If it has been merged, then the changes should be present in the latest version of LangChain.

Please try these solutions and let me know if they work for you. If not, could you please provide more details about the issue you're experiencing? Specifically, any error messages you're seeing would be very helpful.

Best, Dosu

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

dosubot[bot] avatar Sep 07 '23 07:09 dosubot[bot]

hi, even with the above changes, the problem exists. There is no error, but unable to stream the final answer to chainlit UI

template = """ You are a helpful AI assistant. Provide the answer for the following question:

Question: {question} Answer: """

model_basename = "longchat-7b-16k-GPTQ-4bit-128g.no-act.order" model = AutoGPTQForCausalLM.from_quantized(model_path, model_basename=model_basename, use_safetensors=True, trust_remote_code=True, device_map='auto', use_triton=False, load_in_8bit=True, quantize_config=None, streaming=True, verbose=True)

tokenizer = AutoTokenizer.from_pretrained(model_path)

streamer = TextStreamer(tokenizer)

pipe = pipeline( "text-generation", model=model, tokenizer=tokenizer, max_new_tokens=50, batch_size=8, streamer=streamer ) llm = HuggingFacePipeline(pipeline=pipe)

@cl.on_chat_start async def main():

prompt = PromptTemplate(template=template, input_variables=["question"])
llm_chain = LLMChain(prompt=prompt, llm=llm,verbose=True)
cl.user_session.set("llm_chain", llm_chain) 

return llm_chain

#@cl.langchain_run @cl.on_message async def run(input_str):

# res = agent(input_str)
cb = cl.LangchainCallbackHandler(
    stream_final_answer=True, answer_prefix_tokens=["Answer",":"]
)
cb.answer_reached = True
print('user question :',input_str)

# Retrieve the chain from the user session
llm_chain = cl.user_session.get("llm_chain") 
cb.answer_reached = True
res = await cl.make_async(llm_chain)(
    input_str,callbacks = [cb] #callbacks=[cl.ChainlitCallbackHandler()]
)
print(res)
await cl.Message(content=res["text"]).send()

poojitharamachandra avatar Sep 07 '23 09:09 poojitharamachandra

hi, any update on this?

poojitharamachandra avatar Sep 14 '23 08:09 poojitharamachandra

Having the same problem: https://github.com/langchain-ai/langchain/issues/11739

daolmedo avatar Oct 12 '23 23:10 daolmedo

Having the same problem: #11739

@daolmedo did you find a solution?

poojitharamachandra avatar Oct 13 '23 05:10 poojitharamachandra

try this custom callback

from langchain.callbacks.base import BaseCallbackHandler

class StreamHandler(BaseCallbackHandler):
    def __init__(self):
        self.msg = cl.Message(content="")

    async def on_llm_new_token(self, token: str, **kwargs):
        await self.msg.stream_token(token)

    async def on_llm_end(self, response: str, **kwargs):
        await self.msg.send()
        self.msg = cl.Message(content="")
@cl.on_message
async def main(message: cl.Message):
    # Retrieve the chain from the user session
    llm_chain = cl.user_session.get("llm_chain")
    res = await llm_chain.acall(message.content, callbacks=[cl.AsyncLangchainCallbackHandler(),StreamHandler()])

dxdiagmauric avatar Oct 18 '23 06:10 dxdiagmauric

Still dealing with it... Thanks for the repsonse @dxdiagmauric. I'm not sure what "cl" means and have not found anything about it. Could you ellaborate a bit more?

daolmedo avatar Oct 18 '23 23:10 daolmedo

I use this refs https://docs.chainlit.io/chat-experience/streaming/python and https://python.langchain.com/docs/modules/callbacks/custom_callbacks about Python Streaming-Custom callback handlers. "cl" refers to chainlit module. This function send tokens to the chainlit UI on llm new tokens callback.

 async def on_llm_new_token(self, token: str, **kwargs):
        await self.msg.stream_token(token)

I faced this problem with ConversationalRetrievalChain, but this custom callback class solves my problem.


import chainlit as cl
from langchain.memory import ConversationBufferMemory
from langchain.chat_models import ChatOpenAI
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.chains import ConversationalRetrievalChain
from langchain.callbacks.base import BaseCallbackHandler
 
...
@cl.on_chat_start
def main():
    #this get a vector strore
    vectorstore = init_embeddings()
    memory = ConversationBufferMemory(
        memory_key="chat_history",
        output_key="answer", 
        return_messages=True
    )
    llm = ChatOpenAI(
        openai_api_key=OPENAI_API_KEY,
        model_name='gpt-4',
        temperature=0.0,
        streaming=True,
    )
    qa = ConversationalRetrievalChain.from_llm(
        llm=llm, 
        retriever=vectorstore.as_retriever(), 
        memory=memory,  
    )
    cl.user_session.set("llm_chain", qa)

class StreamHandler(BaseCallbackHandler):
    def __init__(self):
        self.msg = cl.Message(content="")

    async def on_llm_new_token(self, token: str, **kwargs):
        await self.msg.stream_token(token)

    async def on_llm_end(self, response: str, **kwargs):
        await self.msg.send()
        self.msg = cl.Message(content="")

@cl.on_message
async def main(message: cl.Message):
    # Retrieve the chain from the user session
    llm_chain = cl.user_session.get("llm_chain")
    res = await llm_chain.acall(message.content, callbacks=[cl.AsyncLangchainCallbackHandler(),StreamHandler()])

...

dxdiagmauric avatar Oct 19 '23 17:10 dxdiagmauric

I asked GPT-4 to help with a similar problem, my RAG Chainlit app was not streaming. The answer and code it gave me streams, but now I get a lot of repetition. Everything below is from GPT-4:

The Python Chainlit code you've provided seems to be a part of a LangChain conversation chain. To ensure the final answer streams as expected, you need to make sure that the last step of the chain always starts with the same prefix, "FINAL ANSWER" in your case. If your chain only has one step, you can force the final answer by manually setting answer_reached to True after instantiating the callback handler and before calling the chain Source 2.

To apply this fix, you can modify your code as follows:

@cl.on_message
async def main(message: cl.Message):
    chain = cl.user_session.get("chain")  # type: ConversationalRetrievalChain

    answer_prefix_tokens=["FINAL", "ANSWER"] # added 

    cb = cl.AsyncLangchainCallbackHandler(
        stream_final_answer=True,
        answer_prefix_tokens=answer_prefix_tokens,
    )

    # Force final answer if necessary
    cb.answer_reached = True

    res = await chain.acall(message.content, callbacks=[cb])
    answer = res["answer"]
    source_documents = res["source_documents"]  # type: List[Document]

    text_elements = []  # type: List[cl.Text]

    if source_documents:
        for source_idx, source_doc in enumerate(source_documents):
            source_name = f"excerpt_{source_idx + 1}"
            # Create the text element referenced in the message
            text_elements.append(
                cl.Text(content=source_doc.page_content, name=source_name)
            )
        source_names = [text_el.name for text_el in text_elements]

        if source_names:
            answer += f"\nSources: {', '.join(source_names)}"
        else:
            answer += "\nNo sources found"

    await cl.Message(content=answer, elements=text_elements).send()

This modification forces the final answer to be streamed by setting cb.answer_reached to True. This is highly useful when the chain only has one step, and you want to ensure that the answer is streamed as expected Source 2.

smach avatar Oct 24 '23 01:10 smach

@dxdiagmauric Same thing happens when I use your code, it works to stream, but then my chatbot starts repeating both the questions and the answers.

smach avatar Oct 24 '23 23:10 smach

I am facing the same issue. I used " # Force final answer if necessary cb.answer_reached = True" and now answer is repeated.

And I am using Sync callback

@cl.on_message async def main(message: cl.Message): chain = cl.user_session.get("chain") # type: ConversationalRetrievalChain

answer_prefix_tokens=["FINAL", "ANSWER"]    

cb = cl.LangchainCallbackHandler(stream_final_answer=True,  answer_prefix_tokens=answer_prefix_tokens,)
# Force final answer if necessary
cb.answer_reached = True    

res = await cl.make_async(chain)(
    message.content, callbacks=[cb]
)
answer = res["answer"]
await cl.Message(content=answer).send
return chain

tsantra avatar Oct 25 '23 19:10 tsantra

@dosu-bot For me the solution was to add the following from @smach answer:

cb.answer_reached = True

raoulcapello avatar Oct 26 '23 17:10 raoulcapello

I am still facing the issue, the answers from llm are being streamed to the stdout, but not the UI.

if I use the solution provided by @dxdiagmauric using custom callback, i get the following error: .conda\envs\chatbot\Lib\site-packages\langchain\callbacks\manager.py:329: RuntimeWarning: coroutine 'StreamHandler.on_llm_end' was never awaited getattr(handler, event_name)(*args, **kwargs) RuntimeWarning: Enable tracemalloc to get the object allocation traceback

here is my code:

@cl.on_message async def run(input_str): cb = cl.LangchainCallbackHandler( stream_final_answer=True, answer_prefix_tokens=["Helpful Answer:", "result","answer","Assistant"] ) cb.answer_reached = True

# Retrieve the chain from the user session
llm_chain = cl.user_session.get("llm_chain")
res = await cl.make_async(llm_chain)(
    input_str, callbacks=[cb,StreamHandler()]  
)
fnl_ans = f"{res['result']}"

await cl.Message(content=fnl_ans).send()

poojitharamachandra avatar Oct 27 '23 06:10 poojitharamachandra

🤖

@baskaryan, I could use your expertise on this one, it seems to be a bit tricky.


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

dosubot[bot] avatar Oct 27 '23 06:10 dosubot[bot]

any update on this?i am facing the same

NisaarAgharia avatar Nov 10 '23 05:11 NisaarAgharia

Someone knows a solution for this duplication? @smach

DaviReisVieira avatar Jan 05 '24 17:01 DaviReisVieira

@dosu-bot Any update on this topic ?

maxime-paquet-b12 avatar Jan 18 '24 15:01 maxime-paquet-b12

Hi I have the same issue , but none of the above comments work for me.

-Python 3.9

-Packages version : chainlit==0.7.3 langchain==0.0.349 langchain-community==0.0.1 langchain-core==0.0.13 langsmith==0.0.84

Code :


from langchain.memory import ChatMessageHistory,ConversationBufferWindowMemory
import chainlit as cl
from langchain.chains import LLMChain,ConversationChain
from transformers import AutoModelForCausalLM ,AutoTokenizer, pipeline , TextStreamer
from langchain_community.llms.huggingface_pipeline import HuggingFacePipeline
from langchain.prompts import PromptTemplate
from langchain.schema.runnable.config import RunnableConfig

model_ = AutoModelForCausalLM.from_pretrained("../models/Mistral_Instruct_v1.02/",device_map="auto",
                                    do_sample=False,
                                    offload_folder="../models/offload_2/")

tokenizer = AutoTokenizer.from_pretrained(llm_model_path)

streamer =  TextStreamer(tokenizer, skip_prompt=True)

pipe = pipeline("text-generation", model=model_, tokenizer=tokenizer, max_new_tokens=100000,streamer=streamer)
model = HuggingFacePipeline(pipeline=pipe)

@cl.on_chat_start
async def on_chat_start():
    msg= cl.Message(content = "Bonjour , comment puis je vous aider ? Hi , how can I help you today? ", disable_human_feedback=True)
    message_history = ChatMessageHistory()
    memory_chat = ConversationBufferWindowMemory(k=2, return_messages=True,     memory_key="history",
                                                chat_memory=message_history,input_key="input")
    cl.user_session.set("memory_chat",memory_chat)
    await msg.send()


@cl.on_message
async def main(message: cl.Message):

    memory_chat = cl.user_session.get("memory_chat")
       
     #prompt_template comes from another function, no problem with that let's say it is defined
    prompt = PromptTemplate(template=prompt_template,input_variables=[ "history","input"] )

    chain = ConversationChain(llm=model,prompt=prompt,
            memory=memory_chat,
            verbose = True)


    answer_prefix_tokens=["FINAL", "ANSWER","response","answer","</s>"] # added 

    cb = cl.AsyncLangchainCallbackHandler(
             stream_final_answer=True,
             answer_prefix_tokens=answer_prefix_tokens,
         )
    cb.answer_reached = True

        
    msg = cl.Message(
         content="")
       
        
    async for chunk in chain.astream(
        {"input": message.content},
        config=RunnableConfig(callbacks=[cb]),):
                                                   
        print('message content : ' , message.content)
        print('chunk :', chunk)
        for token in chunk['response']:
            await msg.stream_token(token)
        
    await msg.send()

I have tried the custom StreamHandler , the approach with chain.acall mentionned by @smach . I also tried this version : https://docs.chainlit.io/integrations/langchain

But i still have the output streamed in the stdout but not in the UI: image

Actually when the chain is finished i can see the output streaming in the UI , which is not the expected behaviour as we loose even more time than without streaming :

image

How can I do to see the streaming in the UI?

Serge9744 avatar Feb 06 '24 15:02 Serge9744

Seconded: steaming is not working for me.

Chainlit implementation:


import os
from typing import List

# from langchain.embeddings.openai import OpenAIEmbeddings
from langchain_community.embeddings import GPT4AllEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain.chains import (
    ConversationalRetrievalChain,
)
from langchain.chat_models import ChatOpenAI

from langchain.docstore.document import Document
from langchain.memory import ChatMessageHistory, ConversationBufferMemory

import chainlit as cl

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)


@cl.on_chat_start
async def on_chat_start():
    files = None

    # Wait for the user to upload a file
    while files == None:
        files = await cl.AskFileMessage(
            content="Please upload a text file to begin!",
            accept=["text/plain"],
            max_size_mb=20,
            timeout=180,
        ).send()

    file = files[0]

    msg = cl.Message(content=f"Processing `{file.name}`...", disable_feedback=True)
    await msg.send()

    with open(file.path, "r", encoding="utf-8") as f:
        text = f.read()

    # Split the text into chunks
    texts = text_splitter.split_text(text)

    # Create a metadata for each chunk
    metadatas = [{"source": f"{i}-pl"} for i in range(len(texts))]

    # Create a Chroma vector store
    embeddings = GPT4AllEmbeddings()
    docsearch = await cl.make_async(Chroma.from_texts)(
        texts, embeddings, metadatas=metadatas
    )

    message_history = ChatMessageHistory()

    memory = ConversationBufferMemory(
        memory_key="chat_history",
        output_key="answer",
        chat_memory=message_history,
        return_messages=True,
    )

    # Create a chain that uses the Chroma vector store
    chain = ConversationalRetrievalChain.from_llm(
        ChatOpenAI(base_url="http://localhost:1234/v1", api_key="not-needed", temperature=0.7, max_tokens=1000, streaming=True),
        chain_type="stuff",
        retriever=docsearch.as_retriever(),
        memory=memory,
        return_source_documents=True,
    )

    # Let the user know that the system is ready
    msg.content = f"Processing `{file.name}` done. You can now ask questions!"
    await msg.update()

    cl.user_session.set("chain", chain)


@cl.on_message
async def main(message: cl.Message):
    chain = cl.user_session.get("chain")  # type: ConversationalRetrievalChain
    cb = cl.LangchainCallbackHandler(
        stream_final_answer=True,
        answer_prefix_tokens=["FINAL", "ANSWER"]
    )
    res = await chain.ainvoke(message.content, callbacks=[cb])
    answer = res["answer"]
    source_documents = res["source_documents"]  # type: List[Document]

    text_elements = []  # type: List[cl.Text]

    if source_documents:
        for source_idx, source_doc in enumerate(source_documents):
            source_name = f"source_{source_idx}"
            # Create the text element referenced in the message
            text_elements.append(
                cl.Text(content=source_doc.page_content, name=source_name)
            )
        source_names = [text_el.name for text_el in text_elements]

        if source_names:
            answer += f"\nSources: {', '.join(source_names)}"
        else:
            answer += "\nNo sources found"

    await cl.Message(content=answer, elements=text_elements).send()

Confirmed that streaming works in a Jupyter notebook using the following when instantiating the LLM

Base Langchain implementation in a Jupyter Notebook

llm = ChatOpenAI(base_url="http://localhost:1234/v1", api_key="not-needed", temperature=0.7, max_tokens=1000, streaming=True, callbacks=[StreamingStdOutCallbackHandler()])

@willydouhard, curious to know if it's implemented differently nowadays. Thanks!

p-gonzo avatar Feb 23 '24 01:02 p-gonzo

try this custom callback

from langchain.callbacks.base import BaseCallbackHandler

class StreamHandler(BaseCallbackHandler):
    def __init__(self):
        self.msg = cl.Message(content="")

    async def on_llm_new_token(self, token: str, **kwargs):
        await self.msg.stream_token(token)

    async def on_llm_end(self, response: str, **kwargs):
        await self.msg.send()
        self.msg = cl.Message(content="")
@cl.on_message
async def main(message: cl.Message):
    # Retrieve the chain from the user session
    llm_chain = cl.user_session.get("llm_chain")
    res = await llm_chain.acall(message.content, callbacks=[cl.AsyncLangchainCallbackHandler(),StreamHandler()])

Thank you @dxdiagmauric this is working for me, so basically we just need to create a custom StreamHandler class to handle the streaming token output, I think this is most take inspiration from LangChain's StreamLit tutorial, for ChainLit we just need to change st.content to Cl.message(content="") to output pre-fill value.

Thanks for saving my day!

vinhnx avatar Apr 20 '24 01:04 vinhnx

the streaming seems to work at me with solutions above mentioned , but it keeps showing the question and then replacing it with answer, any idea where the problem may be ?

ElonTelos avatar May 10 '24 15:05 ElonTelos