langchain icon indicating copy to clipboard operation
langchain copied to clipboard

Issue: Does RetrievalQA Support Streaming Replies?

Open Fzx-oss opened this issue 1 year ago • 4 comments

Question

I'm interested in creating a conversational app using RetrievalQA that can also answer using external knowledge. However, I'm curious whether RetrievalQA supports replying in a streaming manner. I couldn't find any related articles, so I would like to ask everyone here.

Fzx-oss avatar May 19 '23 00:05 Fzx-oss

What do you mean by streaming manner?

akshayghatiki311 avatar May 19 '23 06:05 akshayghatiki311

@akshayghatiki311

Streaming is a feature that allows receiving incremental results in a streaming format when generating long conversations or text. In ChatOpenAI from LangChain, setting the streaming variable to True enables this functionality. However, it does not work properly in RetrievalQA or ConversationalRetrievalChain. Therefore, I would like to ask everyone if they have any good examples.

Reference

Fzx-oss avatar May 20 '23 00:05 Fzx-oss

It's up to you. You would achieve streaming via a callback handler with works on chains and LLMs. Works totally fine on any type of chain, including RetrievalQA. The callback handler will define a set of callbacks for various conditions, one of which would be "on next token" which you would use to "stream" (make sure to set up your LLM object to support streaming and the correct verbosity).

preritdas avatar May 21 '23 03:05 preritdas

Trying to achieve the same and stuck as well. Here is how my code looks like :

from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
callbacks = [StreamingStdOutCallbackHandler()]
...

 llamamodel = LlamaCpp(model_path=MODEL_PATH, n_threads=6,  n_ctx=2048, max_tokens=500,  temperature = 0.6, top_k = 40, top_p=0.8, echo = True, repeat_penalty=1.17, use_mmap=True, verbose=True, use_mlock=True)
        
chain = LLMChain(llm=llamamodel, prompt=PROMPT, callbacks=callbacks) 
 
inputs = [{"context": inputcontext, "question": thequery}]

print(chain.apply(inputs)[0]["text"]) 

it just output text when it has finished.

jppaolim avatar May 30 '23 00:05 jppaolim

Try passing the callbacks to your LLM object. @jppaolim

preritdas avatar May 30 '23 00:05 preritdas

It worked ! Thanks a lot.

jppaolim avatar May 30 '23 00:05 jppaolim

No prob @jbpacker

preritdas avatar May 30 '23 00:05 preritdas

Thank you all for the enthusiastic responses to my question. I want to share my sample code. This is a sample code that shows how to enable streaming output in RetrievalQA. I believe it can be used with other chain objects as well.

import os

# Set the OpenAI API key
openai_api_key = "..."
os.environ["OPENAI_API_KEY"] = openai_api_key

from langchain.document_loaders import TextLoader

# Load your text data using the TextLoader
loader = TextLoader("...")
documents = loader.load()

from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.text_splitter import CharacterTextSplitter

# Generate VectorDB using Chroma and OpenAIEmbeddings
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=0)
texts = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
docsearch = Chroma.from_documents(texts, embeddings)

from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

# Create a ChatOpenAI instance for interactive chat using the OpenAI model
llm = ChatOpenAI(
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()],
    temperature=0,
    openai_api_key=openai_api_key,
)

# Create a RetrievalQA chain using the ChatOpenAI model and the document retriever
qa = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=docsearch.as_retriever(),
)

# Run the QA system by providing a question to the chain
qa.run("Your question")

*I recommend pip install the latest langchain.

Fzx-oss avatar May 30 '23 04:05 Fzx-oss

Thank you, everyone!

Fzx-oss avatar Jun 01 '23 23:06 Fzx-oss

Thanks for your share, same problem

miknyko avatar Jul 26 '23 08:07 miknyko

how to use this streaming with custom pre-trained models?

rd-neosoft avatar Aug 06 '23 11:08 rd-neosoft

Hi I am also experiencing this problem where I am using a ConversationRetrivalChain and want to stream output. In my case, only the intermediate steps seem to stream (in addition to duplicate tokens during this process), and the final output never actually streams.

bujars avatar Aug 17 '23 20:08 bujars

RetrievalQA chain and ConversationRetrivalChain are also not streaming for me, It does print output to the console in a streaming manner but final output I can stream it just waits till that is done and I get a full answer. Even updated lanchain to latest 0.0.302

    chat = ChatOpenAI(
        streaming=True,
        openai_api_key=app_settings.openai_api_key,
        verbose=True,
        temperature=temperature,
        max_tokens=max_tokens,
        callbacks=[StreamingStdOutCallbackHandler()],
    )  # type: ignore
    chain = RetrievalQA.from_chain_type(
        llm=chat,
        retriever=weaviate_retriever,
        verbose=True,
    )

    qa_chain_response = chain.stream(
        {"query": user_question},
    )
    for i in qa_chain_response:
        print("TOKEN", i)

Output: image

I can see in the debug info response being token by token but in the end, I only receive whole response at once

kubre avatar Sep 26 '23 16:09 kubre

Thank you all for the enthusiastic responses to my question. I want to share my sample code. This is a sample code that shows how to enable streaming output in RetrievalQA. I believe it can be used with other chain objects as well.

import os

# Set the OpenAI API key
openai_api_key = "..."
os.environ["OPENAI_API_KEY"] = openai_api_key

from langchain.document_loaders import TextLoader

# Load your text data using the TextLoader
loader = TextLoader("...")
documents = loader.load()

from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.text_splitter import CharacterTextSplitter

# Generate VectorDB using Chroma and OpenAIEmbeddings
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=0)
texts = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
docsearch = Chroma.from_documents(texts, embeddings)

from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

# Create a ChatOpenAI instance for interactive chat using the OpenAI model
llm = ChatOpenAI(
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()],
    temperature=0,
    openai_api_key=openai_api_key,
)

# Create a RetrievalQA chain using the ChatOpenAI model and the document retriever
qa = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=docsearch.as_retriever(),
)

# Run the QA system by providing a question to the chain
qa.run("Your question")

*I recommend pip install the latest langchain.

Hi Zhongxi, You saved my day through this code. I just have one question, I am creating an API using Django and my goal is to stream this response. When I run the code it works great, streaming in the terminal does work, but when I try to return the stream as evenStream it returns the whole response after it has done streaming in the terminal. I t would be really awesome If you can suggest some of the tactics you used :) Thanks!

Here's my code:

` url = request.data.get("url") questions = request.data.get("questions") prompt = request.data.get("promptName") # userId = request.data.get("userId") # isModified = request.data.get("isModified") # isKnowledgeGraph = request.data.get('isKnowledgeGraph') # cotResult = "" if not url or not questions: return Response({"message": "Please provide valid URL and questions"})

# Process the documents received from the user
try:
    doc_store = process_url(url)
    if not doc_store:
        return Response({"message": "PDF document not loaded"})
except Exception as e:
    return Response({"message": "Error loading PDF document"})


custom_prompt_template = set_custom_prompt(url,prompt)
# Load and process documents
loader = DirectoryLoader(DATA_PATH, glob='*.pdf', loader_cls=PyPDFLoader)
documents = loader.load()   

text_splitter = CustomRecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=30)
texts = text_splitter.split_documents(documents)

#Creating Embeddings using OpenAI
embeddings = OpenAIEmbeddings(chunk_size= 16, openai_api_key= openai_gpt_key,)

db = FAISS.from_documents(texts, embeddings)
db.save_local(DB_FAISS_PATH)
search_kwargs = {
    'k': 30,
    'fetch_k':100,
    'maximal_marginal_relevance': True,
    'distance_metric': 'cos',
}
retriever=db.as_retriever(search_kwargs=search_kwargs)

# get the list of question from the body
questionList =request.data['questions']
getLocation = request.data['IsLocation']

#This function is to generate responses from OpenAi
def openai_response_generator():
        # Create an instance of ChatOpenAI
        llm = ChatOpenAI(
            model_name="gpt-3.5-turbo-16k",
            streaming=True,
            callbacks=[StreamingStdOutCallbackHandler()],
            temperature=0,
            openai_api_key= openai_gpt_key,
        )

        # Iterating through the questions list
        for question in (questionList):
            qa = RetrievalQA.from_chain_type(
                llm=llm,
                chain_type="stuff",
                retriever=retriever,
                return_source_documents=False,
                chain_type_kwargs={"prompt": custom_prompt_template},
            )
            
            question = question
            yield qa.run(question)

return StreamingHttpResponse(openai_response_generator(), content_type="text/event-stream")

`

Mayank-dev1822 avatar Oct 22 '23 11:10 Mayank-dev1822

Thank you all for the enthusiastic responses to my question. I want to share my sample code. This is a sample code that shows how to enable streaming output in RetrievalQA. I believe it can be used with other chain objects as well.

import os

# Set the OpenAI API key
openai_api_key = "..."
os.environ["OPENAI_API_KEY"] = openai_api_key

from langchain.document_loaders import TextLoader

# Load your text data using the TextLoader
loader = TextLoader("...")
documents = loader.load()

from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.text_splitter import CharacterTextSplitter

# Generate VectorDB using Chroma and OpenAIEmbeddings
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=0)
texts = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
docsearch = Chroma.from_documents(texts, embeddings)

from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

# Create a ChatOpenAI instance for interactive chat using the OpenAI model
llm = ChatOpenAI(
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()],
    temperature=0,
    openai_api_key=openai_api_key,
)

# Create a RetrievalQA chain using the ChatOpenAI model and the document retriever
qa = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=docsearch.as_retriever(),
)

# Run the QA system by providing a question to the chain
qa.run("Your question")

*I recommend pip install the latest langchain.

Hi Zhongxi, You saved my day through this code. I just have one question, I am creating an API using Django and my goal is to stream this response. When I run the code it works great, streaming in the terminal does work, but when I try to return the stream as evenStream it returns the whole response after it has done streaming in the terminal. I t would be really awesome If you can suggest some of the tactics you used :) Thanks!

Here's my code:

` url = request.data.get("url") questions = request.data.get("questions") prompt = request.data.get("promptName") # userId = request.data.get("userId") # isModified = request.data.get("isModified") # isKnowledgeGraph = request.data.get('isKnowledgeGraph') # cotResult = "" if not url or not questions: return Response({"message": "Please provide valid URL and questions"})

# Process the documents received from the user
try:
    doc_store = process_url(url)
    if not doc_store:
        return Response({"message": "PDF document not loaded"})
except Exception as e:
    return Response({"message": "Error loading PDF document"})


custom_prompt_template = set_custom_prompt(url,prompt)
# Load and process documents
loader = DirectoryLoader(DATA_PATH, glob='*.pdf', loader_cls=PyPDFLoader)
documents = loader.load()   

text_splitter = CustomRecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=30)
texts = text_splitter.split_documents(documents)

#Creating Embeddings using OpenAI
embeddings = OpenAIEmbeddings(chunk_size= 16, openai_api_key= openai_gpt_key,)

db = FAISS.from_documents(texts, embeddings)
db.save_local(DB_FAISS_PATH)
search_kwargs = {
    'k': 30,
    'fetch_k':100,
    'maximal_marginal_relevance': True,
    'distance_metric': 'cos',
}
retriever=db.as_retriever(search_kwargs=search_kwargs)

# get the list of question from the body
questionList =request.data['questions']
getLocation = request.data['IsLocation']

#This function is to generate responses from OpenAi
def openai_response_generator():
        # Create an instance of ChatOpenAI
        llm = ChatOpenAI(
            model_name="gpt-3.5-turbo-16k",
            streaming=True,
            callbacks=[StreamingStdOutCallbackHandler()],
            temperature=0,
            openai_api_key= openai_gpt_key,
        )

        # Iterating through the questions list
        for question in (questionList):
            qa = RetrievalQA.from_chain_type(
                llm=llm,
                chain_type="stuff",
                retriever=retriever,
                return_source_documents=False,
                chain_type_kwargs={"prompt": custom_prompt_template},
            )
            
            question = question
            yield qa.run(question)

return StreamingHttpResponse(openai_response_generator(), content_type="text/event-stream")

`

Did you find any solution for streaming on frontend?

revolutionarybukhari avatar Nov 01 '23 17:11 revolutionarybukhari

Yup, I fixed it usig FastAPI.

On Wed, 1 Nov 2023 at 10:56 PM, Syed Husnain Haider Bukhari < @.***> wrote:

Thank you all for the enthusiastic responses to my question. I want to share my sample code. This is a sample code that shows how to enable streaming output in RetrievalQA. I believe it can be used with other chain objects as well.

import os

Set the OpenAI API key

openai_api_key = "..." os.environ["OPENAI_API_KEY"] = openai_api_key

from langchain.document_loaders import TextLoader

Load your text data using the TextLoader

loader = TextLoader("...") documents = loader.load()

from langchain.embeddings.openai import OpenAIEmbeddings from langchain.vectorstores import Chroma from langchain.text_splitter import CharacterTextSplitter

Generate VectorDB using Chroma and OpenAIEmbeddings

text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=0) texts = text_splitter.split_documents(documents) embeddings = OpenAIEmbeddings() docsearch = Chroma.from_documents(texts, embeddings)

from langchain.chat_models import ChatOpenAI from langchain.chains import RetrievalQA from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

Create a ChatOpenAI instance for interactive chat using the OpenAI model

llm = ChatOpenAI( streaming=True, callbacks=[StreamingStdOutCallbackHandler()], temperature=0, openai_api_key=openai_api_key, )

Create a RetrievalQA chain using the ChatOpenAI model and the document retriever

qa = RetrievalQA.from_chain_type( llm=llm, chain_type="stuff", retriever=docsearch.as_retriever(), )

Run the QA system by providing a question to the chain

qa.run("Your question")

*I recommend pip install the latest langchain.

Hi Zhongxi, You saved my day through this code. I just have one question, I am creating an API using Django and my goal is to stream this response. When I run the code it works great, streaming in the terminal does work, but when I try to return the stream as evenStream it returns the whole response after it has done streaming in the terminal. I t would be really awesome If you can suggest some of the tactics you used :) Thanks!

Here's my code:

` url = request.data.get("url") questions = request.data.get("questions") prompt = request.data.get("promptName") # userId = request.data.get("userId") # isModified = request.data.get("isModified") # isKnowledgeGraph = request.data.get('isKnowledgeGraph') # cotResult = "" if not url or not questions: return Response({"message": "Please provide valid URL and questions"})

Process the documents received from the user

try: doc_store = process_url(url) if not doc_store: return Response({"message": "PDF document not loaded"}) except Exception as e: return Response({"message": "Error loading PDF document"})

custom_prompt_template = set_custom_prompt(url,prompt)

Load and process documents

loader = DirectoryLoader(DATA_PATH, glob='*.pdf', loader_cls=PyPDFLoader) documents = loader.load()

text_splitter = CustomRecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=30) texts = text_splitter.split_documents(documents)

#Creating Embeddings using OpenAI embeddings = OpenAIEmbeddings(chunk_size= 16, openai_api_key= openai_gpt_key,)

db = FAISS.from_documents(texts, embeddings) db.save_local(DB_FAISS_PATH) search_kwargs = { 'k': 30, 'fetch_k':100, 'maximal_marginal_relevance': True, 'distance_metric': 'cos', } retriever=db.as_retriever(search_kwargs=search_kwargs)

get the list of question from the body

questionList =request.data['questions'] getLocation = request.data['IsLocation']

#This function is to generate responses from OpenAi def openai_response_generator(): # Create an instance of ChatOpenAI llm = ChatOpenAI( model_name="gpt-3.5-turbo-16k", streaming=True, callbacks=[StreamingStdOutCallbackHandler()], temperature=0, openai_api_key= openai_gpt_key, )

    # Iterating through the questions list
    for question in (questionList):
        qa = RetrievalQA.from_chain_type(
            llm=llm,
            chain_type="stuff",
            retriever=retriever,
            return_source_documents=False,
            chain_type_kwargs={"prompt": custom_prompt_template},
        )

        question = question
        yield qa.run(question)

return StreamingHttpResponse(openai_response_generator(), content_type="text/event-stream")

`

Did you find any solution for streaming on frontend?

— Reply to this email directly, view it on GitHub https://github.com/langchain-ai/langchain/issues/4950#issuecomment-1789359136, or unsubscribe https://github.com/notifications/unsubscribe-auth/AO7BQBNKSDVGTB4ONF2QHI3YCKA2PAVCNFSM6AAAAAAYHDHCPCVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTOOBZGM2TSMJTGY . You are receiving this because you commented.Message ID: @.***>

Mayank-dev1822 avatar Nov 01 '23 18:11 Mayank-dev1822

The std callback only streams to the terminal to actually get its output as of now we just made a new callback class and streamed the output through it.


class QueueCallbackHandler(BaseCallbackHandler):
    def __init__(self, queue):
        self.queue = queue

    def on_llm_new_token(self, token: str, **kwargs) -> None:
        self.queue.put(
            {
                "event": "message",
                "id": "message_id",
                "retry": 1,
                "data": token,
            }
        )

    def on_llm_end(self, *args, **kwargs) -> Any:
        return self.queue.empty()


def stream(cb: Any, queue: Queue) -> Generator:
    job_done = object()

    def task():
        x = cb()
        queue.put(job_done)

    t = Thread(target=task)
    t.start()

    while True:
        try:
            item = queue.get(True, timeout=1)
            if item is job_done:
                break
            yield item
        except Empty:
            continue


# And finally where yo
def api_stream():
    # make sure you set streaming=True for the chain
    def cb():
        llm_chain(
            {"question": user_question, "context": contexts},
            callbacks=[QueueCallbackHandler(queue=output_queue)],
        )

    yield from stream(cb, output_queue)

# the output of above function can be then used in EventSourceResponse

kubre avatar Nov 02 '23 05:11 kubre

Thank you all for the enthusiastic responses to my question. I want to share my sample code. This is a sample code that shows how to enable streaming output in RetrievalQA. I believe it can be used with other chain objects as well.

import os

# Set the OpenAI API key
openai_api_key = "..."
os.environ["OPENAI_API_KEY"] = openai_api_key

from langchain.document_loaders import TextLoader

# Load your text data using the TextLoader
loader = TextLoader("...")
documents = loader.load()

from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.text_splitter import CharacterTextSplitter

# Generate VectorDB using Chroma and OpenAIEmbeddings
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=0)
texts = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
docsearch = Chroma.from_documents(texts, embeddings)

from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

# Create a ChatOpenAI instance for interactive chat using the OpenAI model
llm = ChatOpenAI(
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()],
    temperature=0,
    openai_api_key=openai_api_key,
)

# Create a RetrievalQA chain using the ChatOpenAI model and the document retriever
qa = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=docsearch.as_retriever(),
)

# Run the QA system by providing a question to the chain
qa.run("Your question")

*I recommend pip install the latest langchain.

@Fzx-oss You saved my day. Thanks a ton.

farazmurtaza avatar Dec 06 '23 10:12 farazmurtaza

Yes we can do that : Here is the simple code to achieve that:


llm = ChatOpenAI(temperature=0.2, 
                 callbacks=[callback_handler],
                 streaming=True)
chain = ConversationalRetrievalChain.from_llm(
        llm= llm,
        retriever= retriever,
        memory=memory,
        chain_type='stuff',
        combine_docs_chain_kwargs = chain_type_kwargs    
    )


async def generator(question):
    run = asyncio.create_task(chain.arun(question))
    
    async for token in callback_handler.aiter():
        yield token

    await run
    

# Conversation Route
@router.post('/conversation',status_code=200)
async def get_conversation(body: Question):
    question = body.question
    return StreamingResponse(generator(question), media_type="text/event-stream")

AchyutPL avatar Dec 10 '23 04:12 AchyutPL

how to use this streaming with custom pre-trained models?

whm233 avatar Dec 21 '23 09:12 whm233

The std callback only streams to the terminal to actually get its output as of now we just made a new callback class and streamed the output through it.

class QueueCallbackHandler(BaseCallbackHandler):
    def __init__(self, queue):
        self.queue = queue

    def on_llm_new_token(self, token: str, **kwargs) -> None:
        self.queue.put(
            {
                "event": "message",
                "id": "message_id",
                "retry": 1,
                "data": token,
            }
        )

    def on_llm_end(self, *args, **kwargs) -> Any:
        return self.queue.empty()


def stream(cb: Any, queue: Queue) -> Generator:
    job_done = object()

    def task():
        x = cb()
        queue.put(job_done)

    t = Thread(target=task)
    t.start()

    while True:
        try:
            item = queue.get(True, timeout=1)
            if item is job_done:
                break
            yield item
        except Empty:
            continue


# And finally where yo
def api_stream():
    # make sure you set streaming=True for the chain
    def cb():
        llm_chain(
            {"question": user_question, "context": contexts},
            callbacks=[QueueCallbackHandler(queue=output_queue)],
        )

    yield from stream(cb, output_queue)

# the output of above function can be then used in EventSourceResponse

thanks for your novel code that saves my day!!

TankNee avatar Dec 28 '23 08:12 TankNee

是的,我们可以这样做:这是实现这一点的简单代码:


llm = ChatOpenAI(temperature=0.2, 
                 callbacks=[callback_handler],
                 streaming=True)
chain = ConversationalRetrievalChain.from_llm(
        llm= llm,
        retriever= retriever,
        memory=memory,
        chain_type='stuff',
        combine_docs_chain_kwargs = chain_type_kwargs    
    )


async def generator(question):
    run = asyncio.create_task(chain.arun(question))
    
    async for token in callback_handler.aiter():
        yield token

    await run
    

# Conversation Route
@router.post('/conversation',status_code=200)
async def get_conversation(body: Question):
    question = body.question
    return StreamingResponse(generator(question), media_type="text/event-stream")

Can you show me the full code, thanks

whm233 avatar Dec 28 '23 08:12 whm233

@app.post("/get_gk_query") async def get_query(input_data: QueryInput): question = input_data.question session_id = input_data.session_id if not question: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid input. Please provide a question.") try: # Load Chroma database embedding_function = OpenAIEmbeddings() pdf_name="General_knowledge_base" chroma_db = load_templates_chroma_database(pdf_name, embedding_function) # Retrieve or create the user's session user_session = sessions.setdefault(session_id, {"last_question": None, "conversations": []}) # Memory and QA processing memory = ConversationBufferWindowMemory(k=6) # Load existing conversations into memory for conv in (user_session["conversations"]): #print("memory-->", conv["question"], conv["qa_output"]) memory.save_context({"input": conv["question"]}, {"output": conv["qa_output"]}) prompt = PromptTemplate(input_variables=["context","question"], template=upload_pdf_template) qa = RetrievalQA.from_chain_type(llm=chat_llm, retriever=chroma_db.as_retriever(search_type="mmr"), memory=memory, chain_type_kwargs={'prompt': prompt})

how to add streaming in my code i am using fastapi and i am getting streaming response using qa.run but when i am returning it it just returning as a whole string, how to fix that

brainstationrandd avatar Jan 25 '24 08:01 brainstationrandd

I tried to use callbacks with LangchainJS but had no success for streaming. For me it seems the callbacks in LangchainJS have no effect. I tried to add the callbacks in OpenAI and in a chain as well.

If someone has an example, that works, please show it. :)

FrankDaze avatar Jan 27 '24 11:01 FrankDaze

Why can't I use stream output after using langchain's rag architecture? The program is as follows. Does anyone have relevant experience with langchian rag and using stream output? Thanks for your help

rag_prompt_custom = PromptTemplate.from_template(template)
rag_chain = ({"context": self.retriever | self.format_docs, "question": RunnablePassthrough()}
                | rag_prompt_custom
                | self.llm
                | StrOutputParser())
#response = rag_chain.invoke(query_text)
response = ""
for chunk in rag_chain.stream(query_text):
    response+=chunk
    print(chunk, end="", flush=True)

WenTingTseng avatar Mar 09 '24 02:03 WenTingTseng

Yup, I fixed it usig FastAPI. On Wed, 1 Nov 2023 at 10:56 PM, Syed Husnain Haider Bukhari < @.*> wrote: Thank you all for the enthusiastic responses to my question. I want to share my sample code. This is a sample code that shows how to enable streaming output in RetrievalQA. I believe it can be used with other chain objects as well. import os # Set the OpenAI API key openai_api_key = "..." os.environ["OPENAI_API_KEY"] = openai_api_key from langchain.document_loaders import TextLoader # Load your text data using the TextLoader loader = TextLoader("...") documents = loader.load() from langchain.embeddings.openai import OpenAIEmbeddings from langchain.vectorstores import Chroma from langchain.text_splitter import CharacterTextSplitter # Generate VectorDB using Chroma and OpenAIEmbeddings text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=0) texts = text_splitter.split_documents(documents) embeddings = OpenAIEmbeddings() docsearch = Chroma.from_documents(texts, embeddings) from langchain.chat_models import ChatOpenAI from langchain.chains import RetrievalQA from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler # Create a ChatOpenAI instance for interactive chat using the OpenAI model llm = ChatOpenAI( streaming=True, callbacks=[StreamingStdOutCallbackHandler()], temperature=0, openai_api_key=openai_api_key, ) # Create a RetrievalQA chain using the ChatOpenAI model and the document retriever qa = RetrievalQA.from_chain_type( llm=llm, chain_type="stuff", retriever=docsearch.as_retriever(), ) # Run the QA system by providing a question to the chain qa.run("Your question") I recommend pip install the latest langchain. Hi Zhongxi, You saved my day through this code. I just have one question, I am creating an API using Django and my goal is to stream this response. When I run the code it works great, streaming in the terminal does work, but when I try to return the stream as evenStream it returns the whole response after it has done streaming in the terminal. I t would be really awesome If you can suggest some of the tactics you used :) Thanks! Here's my code: url = request.data.get("url") questions = request.data.get("questions") prompt = request.data.get("promptName") # userId = request.data.get("userId") # isModified = request.data.get("isModified") # isKnowledgeGraph = request.data.get('isKnowledgeGraph') # cotResult = "" if not url or not questions: return Response({"message": "Please provide valid URL and questions"}) # Process the documents received from the user try: doc_store = process_url(url) if not doc_store: return Response({"message": "PDF document not loaded"}) except Exception as e: return Response({"message": "Error loading PDF document"}) custom_prompt_template = set_custom_prompt(url,prompt) # Load and process documents loader = DirectoryLoader(DATA_PATH, glob='*.pdf', loader_cls=PyPDFLoader) documents = loader.load() text_splitter = CustomRecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=30) texts = text_splitter.split_documents(documents) #Creating Embeddings using OpenAI embeddings = OpenAIEmbeddings(chunk_size= 16, openai_api_key= openai_gpt_key,) db = FAISS.from_documents(texts, embeddings) db.save_local(DB_FAISS_PATH) search_kwargs = { 'k': 30, 'fetch_k':100, 'maximal_marginal_relevance': True, 'distance_metric': 'cos', } retriever=db.as_retriever(search_kwargs=search_kwargs) # get the list of question from the body questionList =request.data['questions'] getLocation = request.data['IsLocation'] #This function is to generate responses from OpenAi def openai_response_generator(): # Create an instance of ChatOpenAI llm = ChatOpenAI( model_name="gpt-3.5-turbo-16k", streaming=True, callbacks=[StreamingStdOutCallbackHandler()], temperature=0, openai_api_key= openai_gpt_key, ) # Iterating through the questions list for question in (questionList): qa = RetrievalQA.from_chain_type( llm=llm, chain_type="stuff", retriever=retriever, return_source_documents=False, chain_type_kwargs={"prompt": custom_prompt_template}, ) question = question yield qa.run(question) return StreamingHttpResponse(openai_response_generator(), content_type="text/event-stream") Did you find any solution for streaming on frontend? — Reply to this email directly, view it on GitHub <#4950 (comment)>, or unsubscribe https://github.com/notifications/unsubscribe-auth/AO7BQBNKSDVGTB4ONF2QHI3YCKA2PAVCNFSM6AAAAAAYHDHCPCVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTOOBZGM2TSMJTGY . You are receiving this because you commented.Message ID: @.>

Hello Mayank,

Could you please share the code for streaming in FastAPI using RetrievalQA? I'm currently facing difficulties with this problem and haven't been able to find a solution. Your help would be greatly appreciated. Thank you very much.

bsg321 avatar Apr 08 '24 10:04 bsg321

@app.post("/get_gk_query") async def get_query(input_data: QueryInput): question = input_data.question session_id = input_data.session_id if not question: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid input. Please provide a question.") try: # Load Chroma database embedding_function = OpenAIEmbeddings() pdf_name="General_knowledge_base" chroma_db = load_templates_chroma_database(pdf_name, embedding_function) # Retrieve or create the user's session user_session = sessions.setdefault(session_id, {"last_question": None, "conversations": []}) # Memory and QA processing memory = ConversationBufferWindowMemory(k=6) # Load existing conversations into memory for conv in (user_session["conversations"]): #print("memory-->", conv["question"], conv["qa_output"]) memory.save_context({"input": conv["question"]}, {"output": conv["qa_output"]}) prompt = PromptTemplate(input_variables=["context","question"], template=upload_pdf_template) qa = RetrievalQA.from_chain_type(llm=chat_llm, retriever=chroma_db.as_retriever(search_type="mmr"), memory=memory, chain_type_kwargs={'prompt': prompt})

how to add streaming in my code i am using fastapi and i am getting streaming response using qa.run but when i am returning it it just returning as a whole string, how to fix that

hi im facing the same problem can you let me know how you solved it

pamohammedarshad avatar Apr 18 '24 09:04 pamohammedarshad

Not

@app.post("/get_gk_query") async def get_query(input_data: QueryInput): question = input_data.question session_id = input_data.session_id if not question: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid input. Please provide a question.") try: # Load Chroma database embedding_function = OpenAIEmbeddings() pdf_name="General_knowledge_base" chroma_db = load_templates_chroma_database(pdf_name, embedding_function) # Retrieve or create the user's session user_session = sessions.setdefault(session_id, {"last_question": None, "conversations": []}) # Memory and QA processing memory = ConversationBufferWindowMemory(k=6) # Load existing conversations into memory for conv in (user_session["conversations"]): #print("memory-->", conv["question"], conv["qa_output"]) memory.save_context({"input": conv["question"]}, {"output": conv["qa_output"]}) prompt = PromptTemplate(input_variables=["context","question"], template=upload_pdf_template) qa = RetrievalQA.from_chain_type(llm=chat_llm, retriever=chroma_db.as_retriever(search_type="mmr"), memory=memory, chain_type_kwargs={'prompt': prompt}) how to add streaming in my code i am using fastapi and i am getting streaming response using qa.run but when i am returning it it just returning as a whole string, how to fix that

hi im facing the same problem can you let me know how you solved it

Dont use the old chains anymore use the new LECL like this

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for chunk in chain.astream({"topic": "parrot"}):
    print(chunk, end="|", flush=True)

https://python.langchain.com/docs/expression_language/streaming/

This is working as expected for me

kubre avatar Apr 18 '24 13:04 kubre

Does anyone have any workaround with Flask, And is it possible to return with the source since I need the source as well in the UI?(I am trying to do SSE), i'm not using LECL

if anyone tried with flask+socket and streaming?

uzumakinaruto19 avatar Apr 30 '24 13:04 uzumakinaruto19

感谢大家对我的问题的热情回复。 我想分享我的示例代码。这是一个示例代码,展示了如何在 RetrievalQA 中启用流式输出。我相信它也可以与其他链对象一起使用。

import os

# Set the OpenAI API key
openai_api_key = "..."
os.environ["OPENAI_API_KEY"] = openai_api_key

from langchain.document_loaders import TextLoader

# Load your text data using the TextLoader
loader = TextLoader("...")
documents = loader.load()

from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.text_splitter import CharacterTextSplitter

# Generate VectorDB using Chroma and OpenAIEmbeddings
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=0)
texts = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
docsearch = Chroma.from_documents(texts, embeddings)

from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

# Create a ChatOpenAI instance for interactive chat using the OpenAI model
llm = ChatOpenAI(
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()],
    temperature=0,
    openai_api_key=openai_api_key,
)

# Create a RetrievalQA chain using the ChatOpenAI model and the document retriever
qa = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=docsearch.as_retriever(),
)

# Run the QA system by providing a question to the chain
qa.run("Your question")

*我建议使用 pip 安装最新的 langchain。

感谢!解决了我的问题

Alpaca10086zyys avatar Aug 05 '24 11:08 Alpaca10086zyys