langchain
langchain copied to clipboard
Title: Request for Streaming Outputs as a Generator for Dynamic Chat Responses
Issue Description:
I'm looking for a way to obtain streaming outputs from the model as a generator, which would enable dynamic chat responses in a front-end application. While this functionality is available in the OpenAI API, I couldn't find a similar option in Langchain.
I'm aware that using verbose=True
allows us to see the streamed results printed in the console, but I'm specifically interested in a solution that can be integrated into a front-end app, rather than just displaying the results in the console.
Is there any existing functionality or workaround to achieve this, or could this feature be considered for future implementation?
The way I imagine this is something resembling this:
from langchain.callbacks import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.chat_models import ChatOpenAI
from langchain.schema import (
AIMessage,
HumanMessage,
SystemMessage
)
chat = ChatOpenAI(streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]), verbose=False, temperature=0)
messages = [
SystemMessage(content="You are a helpful assistant that translates English to French."),
HumanMessage(content="Translate this sentence from English to French. I love programming. I really really really really love it")
]
for chunk in chat(messages):
yield chunk.text
Take a reference from chat-langchain source code.
@Jeru2023 , I am looking for an example with SSE instead of websocket, can you help?
@tugot17 is there any ideas now? i have the same issue。
@tugot17 is there any ideas now? i have the same issue。
https://github.com/hwchase17/chat-langchain/blob/master/callback.py - line 16 https://github.com/hwchase17/chat-langchain/blob/master/main.py - line 39
@Jeru2023
This is using a websocets, which are not so easy to work with. We are looking for some example showing how we can just get the reply from chain as a generator.
If you want to use the model output as a streamed chat in Gradio it is not possible as far as I understand?
how can i use ConversationChain with stream responses?
class EnqueueCallbackHandler(AsyncCallbackHandler):
def __init__(self, queue: asyncio.Queue):
self.queue = queue
self.token_count = 0
@property
def always_verbose(self) -> bool:
return True
async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
for c in token:
print(c)
print("==================================")
await self.queue.put(c)
self.token_count += 1
async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
await self.queue.put(None)
async def on_llm_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> None:
await self.queue.put(None)
class ChatView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated,]
async def process_request(self, request):
message_id = request.data.get('message_id')
userprompt = request.data.get('prompt')
queue = asyncio.Queue()
enqueue_callback_handler = EnqueueCallbackHandler(queue)
callback_manager = AsyncCallbackManager([enqueue_callback_handler])
llm = ChatOpenAI(streaming=True, verbose=True, temperature=0.7, callback_manager=callback_manager)
message_history = RedisChatMessageHistory(message_id, url=CHAT_REDIS_URL, ttl=600)
tokens_num = llm.get_num_tokens_from_messages(message_history.messages)
if tokens_num > 1000:
pass
prompt = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template("The following is a friendly conversation between a human and an AI. The AI is talkative and provides lots of specific details from its context. If the AI does not know the answer to a question, it truthfully says it does not know."),
MessagesPlaceholder(variable_name="message_history"),
HumanMessagePromptTemplate.from_template("{input}")
])
memory = ConversationBufferMemory(return_messages=True, memory_key="message_history", chat_memory=message_history)
print("memory", memory)
chain = ConversationChain(memory=memory, prompt=prompt, llm=llm, verbose=True)
await chain.arun(input=userprompt)
# how can i use ConversationChain with stream responses?
def generate_data():
while True:
token = queue.get()
print(token)
if token is None:
break
yield token
for query in [1, 2, 3, 4, 5]:
print(query)
yield query
return StreamingHttpResponse(generate_data(), content_type='text/event-stream')
def post(self, request):
response = asyncio.run(self.process_request(request))
return response
```
Websockets can work!
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
await self.accept()
async def disconnect(self, close_code):
pass
async def receive(self, text_data):
# 从WebSocket接收到的数据
data = json.loads(text_data)
userprompt = data['prompt']
message_id = data['message_id']
callback_handler = AsyncIteratorCallbackHandler()
callback_manager = AsyncCallbackManager([callback_handler])
llm = ChatOpenAI(streaming=True, verbose=True, temperature=0.7, callback_manager=callback_manager)
message_history = RedisChatMessageHistory(message_id, url=CHAT_REDIS_URL, ttl=600)
tokens_num = llm.get_num_tokens_from_messages(message_history.messages)
if tokens_num > 1000:
pass
prompt = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template("The following is a friendly conversation between a human and an AI. The AI is talkative and provides lots of specific details from its context. If the AI does not know the answer to a question, it truthfully says it does not know."),
MessagesPlaceholder(variable_name="message_history"),
HumanMessagePromptTemplate.from_template("{input}")
])
memory = ConversationBufferMemory(return_messages=True, memory_key="message_history", chat_memory=message_history)
chain = ConversationChain(memory=memory, prompt=prompt, llm=llm, verbose=False)
run = asyncio.create_task(chain.arun(input=userprompt))
# 在这里设置您的异步生成器
async for token in callback_handler.aiter():
await self.send(text_data=token)
await run
It's too difficult to use django StreamingHttpResponse
@tugot17 You can implement this by running the LLM call in a thread and using a queue. You need to write a custom callback handler which takes the queue in its constructor, and pushes the token to the queue on the token callback. In the main method, you can iterate over the queue and then yield the results.
@jrhe Do you happen to have an example?
@tugot17 This can definitely be improved, but will do what you want:
from queue import Queue
from typing import Any
import gradio as gr
from anyio.from_thread import start_blocking_portal
from langchain.callbacks.base import AsyncCallbackHandler, AsyncCallbackManager
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
class StreamingLLMCallbackHandler(AsyncCallbackHandler):
"""Callback handler for streaming LLM responses to a queue."""
def __init__(self, q):
self.q = q
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
self.q.put(token)
def bot(chatbot):
user_msg = chatbot[-1][0]
prompt = user_msg
q = Queue()
job_done = object()
async def task(prompt):
llm = ChatOpenAI(
verbose=True,
temperature=0,
streaming=True,
callback_manager=AsyncCallbackManager([StreamingLLMCallbackHandler(q)]),
)
ret = await llm.agenerate([[HumanMessage(content=prompt)]])
q.put(job_done)
return ret
with start_blocking_portal() as portal:
portal.start_task_soon(task, prompt)
content = ""
while True:
next_token = q.get(True, timeout=1)
if next_token is job_done:
break
content += next_token
chatbot[-1][1] = content
yield chatbot
with gr.Blocks() as demo:
chatbot = gr.Chatbot()
msg = gr.Textbox(label="Prompt")
def chat(user_message, history):
return "", history + [[user_message, None]]
msg.submit(chat, [msg, chatbot], [msg, chatbot], queue=False).then(
bot, chatbot, chatbot
)
demo.queue()
demo.launch()
The default AsyncIteratorCallbackHandler
has a bug. In the example by @jianhuihi , during the first chat, on_llm_end
is triggered before self.queue
is empty, causing an error when other.pop().cancel()
is called.
After resolving this issue with if other.pop():
, during the second chat, the while not self.queue.empty() or not self.done.is_set()
loop in aiter
will exit before on_llm_start
(especially when executed in chain), causing aiter
to end immediately and get empty messages.
Maybe we can self.done.clear()
before break
the while loop, or use an user controlled queue
as @jrhe provided
@tugot17这绝对可以改进,但会做你想做的:
from queue import Queue from typing import Any import gradio as gr from anyio.from_thread import start_blocking_portal from langchain.callbacks.base import AsyncCallbackHandler, AsyncCallbackManager from langchain.chat_models import ChatOpenAI from langchain.schema import HumanMessage class StreamingLLMCallbackHandler(AsyncCallbackHandler): """Callback handler for streaming LLM responses to a queue.""" def __init__(self, q): self.q = q def on_llm_new_token(self, token: str, **kwargs: Any) -> None: self.q.put(token) def bot(chatbot): user_msg = chatbot[-1][0] prompt = user_msg q = Queue() job_done = object() async def task(prompt): llm = ChatOpenAI( verbose=True, temperature=0, streaming=True, callback_manager=AsyncCallbackManager([StreamingLLMCallbackHandler(q)]), ) ret = await llm.agenerate([[HumanMessage(content=prompt)]]) q.put(job_done) return ret with start_blocking_portal() as portal: portal.start_task_soon(task, prompt) content = "" while True: next_token = q.get(True, timeout=1) if next_token is job_done: break content += next_token chatbot[-1][1] = content yield chatbot with gr.Blocks() as demo: chatbot = gr.Chatbot() msg = gr.Textbox(label="Prompt") def chat(user_message, history): return "", history + [[user_message, None]] msg.submit(chat, [msg, chatbot], [msg, chatbot], queue=False).then( bot, chatbot, chatbot ) demo.queue() demo.launch()
@jrhe hi,When I modified from your code, an error is popping up. Could you point out the problem for me?
async def task(prompt):
template = """XXXXXXX
Q:{question}
A:
"""
template_sum = """XXXXXXX
Q:{question}
A:
"""
tps = PromptTemplate(template=template_sum, input_variables=["question"])
tp = PromptTemplate(template=template, input_variables=["question"])
fqs = tps.format(question=prompt)
index = faiss.read_index(index_name)
with open(namespace, "rb") as f:
docsearch = pickle.load(f)
docsearch.index = index
llm = ChatOpenAI(streaming=True, callback_manager=AsyncCallbackManager([StreamingLLMCallbackHandler(q)]), verbose=True, temperature=0,openai_api_key=OPENAI_API_KEY)
chain = load_qa_chain(llm,chain_type="stuff")
docs = docsearch.similarity_search(query, include_metadata=True,k=10)
# r = await chain.arun(input_documents=docs, question=fqs)
# fq = tp.format(question=r)
ret = await chain.arun(input_documents=docs, question=fqs)
q.put(job_done)
return ret
Traceback (most recent call last):
File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/gradio/routes.py", line 393, in run_predict
output = await app.get_blocks().process_api(
File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/gradio/blocks.py", line 1059, in process_api
result = await self.call_function(
File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/gradio/blocks.py", line 882, in call_function
prediction = await anyio.to_thread.run_sync(
File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/gradio/utils.py", line 549, in async_iteration
return next(iterator)
File "/home/notebook/data/personal/coding/gpt/chatgpt/t3.py", line 148, in bot
next_token = q.get(True, timeout=1)
File "/opt/conda/envs/chatgpt39/lib/python3.9/queue.py", line 179, in get
raise Empty
_queue.Empty
@tugot17这绝对可以改进,但会做你想做的:
from queue import Queue from typing import Any import gradio as gr from anyio.from_thread import start_blocking_portal from langchain.callbacks.base import AsyncCallbackHandler, AsyncCallbackManager from langchain.chat_models import ChatOpenAI from langchain.schema import HumanMessage class StreamingLLMCallbackHandler(AsyncCallbackHandler): """Callback handler for streaming LLM responses to a queue.""" def __init__(self, q): self.q = q def on_llm_new_token(self, token: str, **kwargs: Any) -> None: self.q.put(token) def bot(chatbot): user_msg = chatbot[-1][0] prompt = user_msg q = Queue() job_done = object() async def task(prompt): llm = ChatOpenAI( verbose=True, temperature=0, streaming=True, callback_manager=AsyncCallbackManager([StreamingLLMCallbackHandler(q)]), ) ret = await llm.agenerate([[HumanMessage(content=prompt)]]) q.put(job_done) return ret with start_blocking_portal() as portal: portal.start_task_soon(task, prompt) content = "" while True: next_token = q.get(True, timeout=1) if next_token is job_done: break content += next_token chatbot[-1][1] = content yield chatbot with gr.Blocks() as demo: chatbot = gr.Chatbot() msg = gr.Textbox(label="Prompt") def chat(user_message, history): return "", history + [[user_message, None]] msg.submit(chat, [msg, chatbot], [msg, chatbot], queue=False).then( bot, chatbot, chatbot ) demo.queue() demo.launch()
@jrhe hi,When I modified from your code, an error is popping up. Could you point out the problem for me?
async def task(prompt): template = """XXXXXXX Q:{question} A: """ template_sum = """XXXXXXX Q:{question} A: """ tps = PromptTemplate(template=template_sum, input_variables=["question"]) tp = PromptTemplate(template=template, input_variables=["question"]) fqs = tps.format(question=prompt) index = faiss.read_index(index_name) with open(namespace, "rb") as f: docsearch = pickle.load(f) docsearch.index = index llm = ChatOpenAI(streaming=True, callback_manager=AsyncCallbackManager([StreamingLLMCallbackHandler(q)]), verbose=True, temperature=0,openai_api_key=OPENAI_API_KEY) chain = load_qa_chain(llm,chain_type="stuff") docs = docsearch.similarity_search(query, include_metadata=True,k=10) # r = await chain.arun(input_documents=docs, question=fqs) # fq = tp.format(question=r) ret = await chain.arun(input_documents=docs, question=fqs) q.put(job_done) return ret
Traceback (most recent call last): File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/gradio/routes.py", line 393, in run_predict output = await app.get_blocks().process_api( File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/gradio/blocks.py", line 1059, in process_api result = await self.call_function( File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/gradio/blocks.py", line 882, in call_function prediction = await anyio.to_thread.run_sync( File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync return await get_asynclib().run_sync_in_worker_thread( File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread return await future File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run result = context.run(func, *args) File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/gradio/utils.py", line 549, in async_iteration return next(iterator) File "/home/notebook/data/personal/coding/gpt/chatgpt/t3.py", line 148, in bot next_token = q.get(True, timeout=1) File "/opt/conda/envs/chatgpt39/lib/python3.9/queue.py", line 179, in get raise Empty _queue.Empty
It means q
is still empty after you wait for 1s, maybe you should wait for token without timeout or increase the default timeout.
The comment from python:
def get(self, block=True, timeout=None):
'''Remove and return an item from the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until an item is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Empty exception if no item was available within that time.
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
'''
None
@coffin5257 Thanks for the kind help. I just set the timeout to None, but it still doesn't return anything. Would it be possible that there are other causes?
@jrhe thanks so much for providing that example!
[EDIT] Actually, please ignore the below. I got the qa_chain working with @jrhe's code and using the arun
function. The reason it didn't work before was because I didn't pass in the parameters correctly, and exceptions do not show up due to the await
.
After a lot of trial and error, at least that gives something to work with. Like @tron19920125 mentioned though, it does seem to stop working as soon as you try to apply it to a qa_chain. And it's not the timeout that's the issue. I suspect there must be something in the chain that doesn't apply 'wait' or 'async' properly, but haven't been able to find out what yet. If anyone who reads this, knows the solution, your assistance would definitely be appreciated.
I haven't tested it but FYI RedisChatMessageHistory
is blocking so be careful not to call this from the main thread if you're running an async server such as FastAPI.
You can also use normal threads, something like
# assuming this is inside your function associated with the chatbot
# in the queue we will store our streamed tokens
q = Queue()
# let's create our default chat
chat = ChatOpenAI(
model_name=MODELS_NAMES[0],
temperature=DEFAULT_TEMPERATURE,
streaming=True,
callbacks=([QueueCallback(q)]),
)
job_done = object()
messages.append(HumanMessage(content=message))
def task():
chat(messages)
q.put(job_done)
t = Thread(target=task)
t.start()
chatbot_messages.append((message, ""))
content = ""
while True:
try:
next_token = q.get(True, timeout=1)
if next_token is job_done:
break
content += next_token
chatbot_messages[-1] = (message, content)
yield "", chatbot_messages
except Empty:
continue
and my callback
class QueueCallback(BaseCallbackHandler):
"""Callback handler for streaming LLM responses to a queue."""
def __init__(self, q):
self.q = q
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
self.q.put(token)
def on_llm_end(self, *args, **kwargs: Any) -> None:
return self.q.empty()
The default
AsyncIteratorCallbackHandler
has a bug. In the example by @jianhuihi , during the first chat,on_llm_end
is triggered beforeself.queue
is empty, causing an error whenother.pop().cancel()
is called. After resolving this issue withif other.pop():
, during the second chat, thewhile not self.queue.empty() or not self.done.is_set()
loop inaiter
will exit beforeon_llm_start
(especially when executed in chain), causingaiter
to end immediately and get empty messages.Maybe we can
self.done.clear()
beforebreak
the while loop, or use an user controlledqueue
as @jrhe provided
have any solution for this i use ConversationalRetrievalChain but it not a perfect solution ->> self.run_times = 1
class AiAsyncIteratorCallbackHandler(AsyncIteratorCallbackHandler, ABC):
"""Callback handler that returns an async iterator."""
queue: asyncio.Queue[str]
done: asyncio.Event
@property
def always_verbose(self) -> bool:
return True
def __init__(self) -> None:
super(AiAsyncIteratorCallbackHandler, self).__init__()
self.run_times = 1
self.run_time = int(time.time())
async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
self.queue.put_nowait(token)
async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
# TODO self.done.set() 执行的合适时机
if self.run_times == 2:
self.done.set()
self.run_times += 1
pass
Hi, I have been struggling with the langchain completion too. Could someone please have a look at my code, not sure what is going wrong.
async def send_message_to_room(room_group_name, message, channel_layer):
print('send_message_to_room')
await channel_layer.group_send(
room_group_name,
{
"type": "chat_message",
"message": message,
}
)
class StreamingLLMCallbackHandler(BaseCallbackHandler):
"""Callback handler for streaming LLM responses to a queue."""
def __init__(self, q: asyncio.Queue, roomname: str):
self.q = q
self.channel_layer = get_channel_layer()
self.roomname = roomname
def on_llm_new_token(self, token: str, **kwargs):
self.q.put_nowait(token)
async def process_queue(self):
while True:
print("Waiting for token")
token = await self.q.get()
if token=='<|endoftext|>':
break
print(f"Token received: {token}")
await send_message_to_room(self.roomname, token, self.channel_layer) # this works
async def run_chat(roomname,
question=question, texts=texts, responsetype="Detailed",
system_message_with_response_type=system_message_with_response_type,
human_message_with_response_type=human_message_with_response_type
):
q = asyncio.Queue()
callback_handler = StreamingLLMCallbackHandler(q, roomname=roomname)
llm = ChatOpenAI(
max_tokens=250,
streaming=True,
callbacks=[callback_handler]
)
# Start a separate thread for processing the queue
p = asyncio.create_task(callback_handler.process_queue())
system_message_with_response_type = SystemMessagePromptTemplate.from_template(system_message_with_response_type)
human_message_prompt = HumanMessagePromptTemplate.from_template(human_message_with_response_type)
chat_prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])
prompt_value = chat_prompt.format_prompt(question=question, texts=texts, responsetype=responsetype)
llm(prompt_value.to_messages())
await q.put('<|endoftext|>')
await p
The group_send() method sends messages to the channel, but the messages aren't being received at the front until the LLM has finished generating, I am not sure. why there is this buffering? Additionally, the buffering seems to happen twice, once after the completion is generated, after which the process_queue finally starts operating, and the second when the process_queue has finished, after which the messages are actually sent to the group_room bu the consumer using the chat_message method. Not sure what I can do to make all this simultaneous. I have tried async operations, and only this code, as it is attached, seems to work.
AsyncIteratorCallbackHandler
works. Here is the example code:
from langchain.callbacks.streaming_aiter import AsyncIteratorCallbackHandler
class TestLangchainAsync(unittest.IsolatedAsyncioTestCase):
async def test_aiter(self):
handler = AsyncIteratorCallbackHandler()
llm = OpenAI(
temperature=0,
streaming=True,
callbacks=[handler],
openai_api_key="sk-xxxxx",
openai_proxy="http://127.0.0.1:7890",
)
prompt = PromptTemplate(
input_variables=["product"],
template="What is a good name for a company that makes {product}?",
)
prompt = prompt.format(product="colorful socks")
asyncio.create_task(llm.agenerate([prompt]))
async for i in handler.aiter():
print(i)
Hey can anybody point me to use it with conversational retrieval chain using open ai api on gradio. I would really appreciate it
@jrhe thanks so much for providing that example!
[EDIT] Actually, please ignore the below. I got the qa_chain working with @jrhe's code and using the
arun
function. The reason it didn't work before was because I didn't pass in the parameters correctly, and exceptions do not show up due to theawait
.After a lot of trial and error, at least that gives something to work with. Like @tron19920125 mentioned though, it does seem to stop working as soon as you try to apply it to a qa_chain. And it's not the timeout that's the issue. I suspect there must be something in the chain that doesn't apply 'wait' or 'async' properly, but haven't been able to find out what yet. If anyone who reads this, knows the solution, your assistance would definitely be appreciated.
Hey can you please provide the code for this ! It would be super helpful
Another gist on the topic. c:
https://gist.github.com/jvelezmagic/03ddf4c452d011aae36b2a0f73d72f68#gistcomment-4599119
Here's a concise gist I wrote on how to do this with threads and queues using the great answer by @FrancescoSaverioZuppichini https://gist.github.com/mortymike/70711b028311681e5f3c6511031d5d43
Hi but i require a solution which handles conversationretrioeval chain for this ? i am not able to understand how to do that
If i try to use it with conversational retrieval chain, then it prints the condensed question rather than the answer. here is the code i am using :
from threading import Thread
from typing import Any, Union, List, Dict
from queue import SimpleQueue
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
from typing import Any, Union, List, Dict
from queue import SimpleQueue
def load_exisiting_db():
embeddings = HuggingFaceInstructEmbeddings(model_name=bi_enc_dict[EMBED_MODEL],
query_instruction='Represent the question for retrieving supporting paragraphs: ',
embed_instruction='Represent the paragraph for retrieval: ')
db = Chroma(persist_directory=persist_directory, embedding_function=embeddings)
return db
def load_retrieval_chain(vectorstore, model_name, streaming_callback):
'''Load Chain'''
# callbacks = [StreamingStdOutCallbackHandler()]
callbacks = streaming_callback
if model_name =='openai':
chat_llm = ChatOpenAI(model_name = 'gpt-3.5-turbo',
callbacks=callbacks,
temperature=0,
streaming = True
)
question_generator = LLMChain(llm=chat_llm, prompt=CONDENSE_QUESTION_PROMPT)
prompt = load_prompt(model_name='openai')
doc_chain = load_qa_chain(llm=chat_llm,chain_type="stuff",prompt=prompt)
chain = ConversationalRetrievalChain(retriever=vectorstore.as_retriever(search_kwargs={"k": target_source_chunks }),
question_generator=question_generator,
combine_docs_chain=doc_chain,
memory=memory,
return_source_documents=True,
get_chat_history=lambda h :h)
return chain
job_done = object()
class StreamingGradioCallbackHandler(BaseCallbackHandler):
def __init__(self, q: SimpleQueue):
self.q = q
def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
"""Run when LLM starts running. Clean the queue."""
while not self.q.empty():
try:
self.q.get(block=False)
except self.q.empty():
continue
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
"""Run on new LLM token. Only available when streaming is enabled."""
# print("tiktik token ", token)
self.q.put(token)
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""Run when LLM ends running."""
self.q.put(job_done)
def on_llm_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> None:
"""Run when LLM errors."""
job_done = object() # signals the processing is done
q = SimpleQueue()
def load_streaming_callback(streaming):
if streaming:
streaming_callback =[StreamingGradioCallbackHandler(q),
StreamingStdOutCallbackHandler()]
else :
streaming_callback =[StreamingStdOutCallbackHandler()]
return streaming_callback
def load_chatbot_chain(streaming=True):
db = load_exisiting_db()
streaming_callback = load_streaming_callback(streaming)
chain = load_retrieval_chain(db, model_name = 'openai', streaming_callback=streaming_callback)
return chain
chatbot_chain = load_chatbot_chain()
def stream_run(question):
answer = chatbot_chain({'question': question})
q.put(job_done)
# return answer['answer']
def stream_bot(history):
job_done = object() # signals the processing is done
user_question = history[-1][0]
thread = Thread(target=stream_run, kwargs={"question": user_question})
thread.start()
history[-1][1] = ""
while True:
next_token = q.get(block=True) # Blocks until an input is available
print("nexttoek ",next_token)
if not isinstance(next_token,str):
break
history[-1][1] += next_token
yield history
thread.join()
I am trying to stream outputs directly from agent to an API endpoint, I cant seem to get it to work! The async is behaving very odd, sometimes it prints the same response twice, sometimes, it doesn't print a response at all. What am I doing wrong?
I will include all my code here, because I am really unsure which part of the chain is the reason. Note that I am a complete beginner in working with co-routines and async operations.
MyCustomCallBacks.py:
from typing import Any, Dict, List
import tiktoken
from langchain.callbacks.base import BaseCallbackHandler
from loguru import logger
from langchain.schema import LLMResult
class ToolsInputCallBackHandler(BaseCallbackHandler):
def __init__(self, logger):
self.logger = logger
def on_agent_action(self, serialized, **kwargs):
self.logger.info(serialized.log)
class ToolsOutputCallBackHandler(BaseCallbackHandler):
def on_tool_end(self, serialized, **kwargs):
logger.info(serialized)
class TokenCostProcess:
## Not relevant here
class CostCalcHandler(BaseCallbackHandler):
## Not relevant here
Agent.py
from BotWrapper import Tool1, Tool2, Tool3, getSystemPrompt
from MyCustomCallBacks import ToolsInputCallBackHandler, ToolsOutputCallBackHandler, TokenCostProcess, CostCalcHandler
from langchain.callbacks.streaming_aiter import AsyncIteratorCallbackHandler
from langchain.chat_models import ChatOpenAI
from langchain.llms import OpenAI
from langchain.prompts import MessagesPlaceholder
from langchain.memory import ConversationBufferWindowMemory, ConversationSummaryMemory, CombinedMemory
from langchain.agents import initialize_agent, AgentType
from langchain.schema.messages import SystemMessage
from typing import Any
from loguru import logger
from ansi2html import Ansi2HTMLConverter
import io
import time
from typing import Awaitable
import asyncio
class MyAgent:
def __init__(self,
model: str="gpt-3.5-turbo-16k-0613"):
system_message = SystemMessage(content=getSystemPrompt())
self.logger = self.initialize_logger()
self.tools = []
for tool in (Tool1, Tool2, Tool3):
tool.callbacks = [ToolsOutputCallBackHandler()]
self.tools.append(tool)
self.model = model
self.token_cost_process = TokenCostProcess()
self.async_callback = AsyncIteratorCallbackHandler() # Faulty?
self.llm = ChatOpenAI(
model=self.model,
temperature=1,
streaming=True,
callbacks=[CostCalcHandler(self.model, self.token_cost_process)]
)
self.agent_kwargs = {
"extra_prompt_messages": [MessagesPlaceholder(variable_name="chat_history_lines"),
MessagesPlaceholder(variable_name="summary")],
"system_message": system_message
}
self.tools_input_handler = ToolsInputCallBackHandler(self.logger)
self.memory = self.reset_memory()
self.agent = initialize_agent(
tools=self.tools,
llm=self.llm,
agent=AgentType.OPENAI_FUNCTIONS,
agent_kwargs=self.agent_kwargs,
memory=self.memory,
callbacks=[self.tools_input_handler]
)
def reset_memory(self):
conv_memory = ConversationBufferWindowMemory(k=2,
memory_key="chat_history_lines",
return_messages=True,
ai_prefix='Alfred',
human_prefix='Caller',
input_key="input")
summary_memory = ConversationSummaryMemory(llm=OpenAI(max_tokens=512),
memory_key="summary",
return_messages=True,
ai_prefix='Alfred',
human_prefix='Caller',
input_key="input")
memory = CombinedMemory(memories = [summary_memory, conv_memory])
return memory
def initialize_logger(self):
logger.remove()
self.log_stream = io.StringIO()
logger.add(self.log_stream, colorize=True, enqueue=True)
return logger
async def wrap_done(self, fn: Awaitable, event: asyncio.Event): # Code I took from git. Perhaps I am using it wrong.
"""Wrap an awaitable with a event to signal when it's done or an exception is raised."""
try:
await fn
except Exception as e:
# TODO: handle exception
print(f"Caught exception: {e}")
finally:
# Signal the aiter to stop.
event.set()
async def __call__(self, user_input):
task = asyncio.create_task(self.wrap_done(
self.agent.acall(user_input, callbacks=[self.async_callback]),
self.async_callback.done)
)
complete_text = ''
async for token in self.async_callback.aiter():
complete_text+=token
yield token
await task
def _get_log(self):
content = self.log_stream.getvalue()
conv = Ansi2HTMLConverter()
html = conv.convert(content, full=True)
html = html.replace(r'\n', '\n')
return html
def get_log(self):
heading = "Get Log Was Called! Summarizing Costs...\n\n"
self.logger.info(heading+self.token_cost_process.get_cost_summary(self.model))
time.sleep(1)
return self._get_log()
async def print_agent_response(agentCall):
async for token in agentCall:
print(token, end="")
print()
agent= MyAgent()
response=agent('Write a short poem in pirate')
asyncio.run(print_agent_response(response))
Any help would be appreciated!! Thanks a lot :)
This is now available as the new .stream()
and .astream()
methods, see https://python.langchain.com/docs/expression_language/interface#stream
Does anyone have an example of using .stream() or .astream()? I'm fairly new to coding, I'm not following the documentation @nfcampos linked to. Much appreciated.
An example would be great!