[Feature Request] stream mode generator can't have final llm output as into to other node
Is your feature request related to a problem? Please describe.
We have a use case like: [llm_node] -> [save_complete_answer_in_external_history_node]. when we have [llm_node] stream mode turned on, we can't save history in DAG, instead we will need to put process of final output from llm_node outside of DAG
Describe the solution you'd like
Can we have a output parameter in llm node to have final output. Take this as example
dag.yaml
$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
inputs:
chat_history:
type: list
default: []
question:
type: string
is_chat_input: true
default: What is ChatGPT?
outputs:
answer:
type: string
reference: ${chat.output.answer}
is_chat_output: true
final_answer:
type: string
reference: ${chat.output.final_answer}
is_chat_output: false
nodes:
- inputs:
# This is to easily switch between openai and azure openai.
# deployment_name is required by azure openai, model is required by openai.
deployment_name: gpt-35-turbo
model: gpt-3.5-turbo
max_tokens: "256"
temperature: "0.7"
chat_history: ${inputs.chat_history}
question: ${inputs.question}
##### can we have this #####
output:
answer: answer_generator
final_answer: final_answer_string
#### can we have this finished #####
name: chat
type: llm
source:
type: code
path: chat.jinja2
api: chat
connection: open_ai_connection
##### below are consumer of final answer example #####
- name: save_history
type: python
source:
type: code
path: save_history.py
inputs:
final_answer: ${chat.output.final_answer}
##### consumer of final answer example ends #####
node_variants: {}
environment:
python_requirements_txt: requirements.txt
And
f = load_flow(source="../../examples/flows/chat/chat-basic/")
f.context.streaming = True
result = f(
chat_history=[
{
"inputs": {"chat_input": "Hi"},
"outputs": {"chat_output": "Hello! How can I assist you today?"},
}
],
question="How are you?",
)
answer = ""
# the result will be a generator, iterate it to get the result
for r in result["answer"]:
answer += r
# result['final_answer'] should be same with "answer" after generator finished
Describe alternatives you've considered not sure Additional context not sure
Hi @vhan2kpmg ,
Just use the ${chat.output} as the input of the save_history node:
$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
inputs:
chat_history:
type: list
default: []
question:
type: string
is_chat_input: true
default: What is ChatGPT?
outputs:
answer:
type: string
reference: ${chat.output}
is_chat_output: true
nodes:
- inputs:
# This is to easily switch between openai and azure openai.
# deployment_name is required by azure openai, model is required by openai.
deployment_name: gpt-35-turbo
model: gpt-3.5-turbo
max_tokens: "256"
temperature: "0.7"
chat_history: ${inputs.chat_history}
question: ${inputs.question}
name: chat
type: llm
source:
type: code
path: chat.jinja2
api: chat
connection: open_ai_connection
- name: save_history
type: python
source:
type: code
path: save_history.py
inputs:
record: ${chat.output}
node_variants: {}
environment:
python_requirements_txt: requirements.txt
And then use the flow as function to run, with streaming mode enbled:
from promptflow import load_flow
f = load_flow(source=r"E:\programs\msft-promptflow\examples\flows\chat\chat-basic-streaming")
f.context.streaming = True
result = f(
chat_history=[
{
"inputs": {"chat_input": "Hi"},
"outputs": {"chat_output": "Hello! How can I assist you today?"},
}
],
question="How are you?",
)
answer = ""
# the result will be a generator, iterate it to get the result
for r in result["answer"]:
answer += r
print(answer)
Inside the save_history node I save the record to a local txt file:
from promptflow.core import tool
@tool
def save(record: str):
# append the record to the history file
with open("history.txt", "a") as f:
f.write(record + "\n")
print(f"Recorded: {record}")
Everytime I run this flow the record can be recorded to the txt file. Could you please provide more details about the statement:
when we have [llm_node] stream mode turned on, we can't save history in DAG
What's the error message, and do you have a sample to repro it?
Hi, thanks for your reply. Sorry I may not explain clearly initially, we can save history, just we will lose the benefit of steam mode meanwhile. From what I tested,
- if no
save_historynode in dag, stream result will be in chunk, e. gresult["answer"]will be a generator. - However, if there is a node after
llmnode, stream result will be just final output, e.g.result["answer"]will be string.
That makes sense in some way because dag outputs are only ready when all nodes are finished? But the purpose of stream mode is to have answer chunk by chunk before finial result finishes, if there is node after llm node, dag wait until all node finishes ? in out example dag wait until response is saved, when all llm response chunks are finished.
Is there anyway we can output generator immediately, meanwhile leave save history as some background tasks?
“ if there is a node after llm node, stream result will be just final output, e.g. result["answer"] will be string.”
You can also return a generator in that node, then that node works just like a generator hook, any iteration of the final node output will trigger llm output iteration. Here's a code sample:
from promptflow.core import tool
@tool
def save(llm_output):
data = []
for chunk in llm_output:
data.append(chunk)
yield chunk
# append the record to the history file
with open("history.txt", "a") as f:
f.write(''.join(data) + "\n")
Is this what you want? @vhan2kpmg
However, if there is a node after llm node, stream result will be just final output, e.g. result["answer"] will be string.
I thinks that's due to the python nature that when you start reading the content then it means the iteration starts, you cannot iterate the same iterator for two times
Hi, we're sending this friendly reminder because we haven't heard back from you in 30 days. We need more information about this issue to help address it. Please be sure to give us your input. If we don't hear back from you within 7 days of this comment, the issue will be automatically closed. Thank you!