promptflow icon indicating copy to clipboard operation
promptflow copied to clipboard

[Feature Request] stream mode generator can't have final llm output as into to other node

Open vhan2kpmg opened this issue 1 year ago • 2 comments

Is your feature request related to a problem? Please describe. We have a use case like: [llm_node] -> [save_complete_answer_in_external_history_node]. when we have [llm_node] stream mode turned on, we can't save history in DAG, instead we will need to put process of final output from llm_node outside of DAG

Describe the solution you'd like Can we have a output parameter in llm node to have final output. Take this as example

dag.yaml

$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
inputs:
  chat_history:
    type: list
    default: []
  question:
    type: string
    is_chat_input: true
    default: What is ChatGPT?
outputs:
  answer:
    type: string
    reference: ${chat.output.answer}
    is_chat_output: true
  final_answer:
    type: string
    reference: ${chat.output.final_answer}
    is_chat_output: false
nodes:
- inputs:
    # This is to easily switch between openai and azure openai.
    # deployment_name is required by azure openai, model is required by openai.
    deployment_name: gpt-35-turbo
    model: gpt-3.5-turbo
    max_tokens: "256"
    temperature: "0.7"
    chat_history: ${inputs.chat_history}
    question: ${inputs.question}
  ##### can we have this  #####
  output:
    answer: answer_generator
    final_answer: final_answer_string
  #### can we have this finished  #####
  name: chat
  type: llm
  source:
    type: code
    path: chat.jinja2
  api: chat
  connection: open_ai_connection
 ##### below are consumer of final answer example  #####
- name: save_history
  type: python
  source:
    type: code
    path: save_history.py
  inputs:
    final_answer: ${chat.output.final_answer}
  ##### consumer of final answer example ends #####
node_variants: {}
environment:
    python_requirements_txt: requirements.txt

And

f = load_flow(source="../../examples/flows/chat/chat-basic/")
f.context.streaming = True
result = f(
    chat_history=[
        {
            "inputs": {"chat_input": "Hi"},
            "outputs": {"chat_output": "Hello! How can I assist you today?"},
        }
    ],
    question="How are you?",
)


answer = ""
# the result will be a generator, iterate it to get the result
for r in result["answer"]:
    answer += r
    
 # result['final_answer'] should be same with "answer" after generator finished

Describe alternatives you've considered not sure Additional context not sure

vhan2kpmg avatar May 02 '24 06:05 vhan2kpmg

Hi @vhan2kpmg ,

Just use the ${chat.output} as the input of the save_history node:

$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
inputs:
  chat_history:
    type: list
    default: []
  question:
    type: string
    is_chat_input: true
    default: What is ChatGPT?
outputs:
  answer:
    type: string
    reference: ${chat.output}
    is_chat_output: true
nodes:
- inputs:
    # This is to easily switch between openai and azure openai.
    # deployment_name is required by azure openai, model is required by openai.
    deployment_name: gpt-35-turbo
    model: gpt-3.5-turbo
    max_tokens: "256"
    temperature: "0.7"
    chat_history: ${inputs.chat_history}
    question: ${inputs.question}
  name: chat
  type: llm
  source:
    type: code
    path: chat.jinja2
  api: chat
  connection: open_ai_connection
- name: save_history
  type: python
  source:
    type: code
    path: save_history.py
  inputs:
    record: ${chat.output}


node_variants: {}
environment:
    python_requirements_txt: requirements.txt

And then use the flow as function to run, with streaming mode enbled:

from promptflow import load_flow

f = load_flow(source=r"E:\programs\msft-promptflow\examples\flows\chat\chat-basic-streaming")
f.context.streaming = True

result = f(
    chat_history=[
        {
            "inputs": {"chat_input": "Hi"},
            "outputs": {"chat_output": "Hello! How can I assist you today?"},
        }
    ],
    question="How are you?",
)


answer = ""
# the result will be a generator, iterate it to get the result
for r in result["answer"]:
    answer += r

print(answer)

Inside the save_history node I save the record to a local txt file:

from promptflow.core import tool

@tool
def save(record: str):
    # append the record to the history file
    with open("history.txt", "a") as f:
        f.write(record + "\n")
    print(f"Recorded: {record}")

Everytime I run this flow the record can be recorded to the txt file. Could you please provide more details about the statement:

when we have [llm_node] stream mode turned on, we can't save history in DAG

What's the error message, and do you have a sample to repro it?

0mza987 avatar May 06 '24 09:05 0mza987

Hi, thanks for your reply. Sorry I may not explain clearly initially, we can save history, just we will lose the benefit of steam mode meanwhile. From what I tested,

  • if no save_history node in dag, stream result will be in chunk, e. g result["answer"] will be a generator.
  • However, if there is a node after llm node, stream result will be just final output, e.g. result["answer"] will be string.

That makes sense in some way because dag outputs are only ready when all nodes are finished? But the purpose of stream mode is to have answer chunk by chunk before finial result finishes, if there is node after llm node, dag wait until all node finishes ? in out example dag wait until response is saved, when all llm response chunks are finished.

Is there anyway we can output generator immediately, meanwhile leave save history as some background tasks?

vhan2kpmg avatar May 13 '24 11:05 vhan2kpmg

“ if there is a node after llm node, stream result will be just final output, e.g. result["answer"] will be string.”

You can also return a generator in that node, then that node works just like a generator hook, any iteration of the final node output will trigger llm output iteration. Here's a code sample:

from promptflow.core import tool

@tool
def save(llm_output):
    data = []
    for chunk in llm_output:
        data.append(chunk)
        yield chunk
    # append the record to the history file
    with open("history.txt", "a") as f:
        f.write(''.join(data) + "\n")

Is this what you want? @vhan2kpmg

wxpjimmy avatar May 21 '24 03:05 wxpjimmy

However, if there is a node after llm node, stream result will be just final output, e.g. result["answer"] will be string.

I thinks that's due to the python nature that when you start reading the content then it means the iteration starts, you cannot iterate the same iterator for two times

0mza987 avatar May 22 '24 07:05 0mza987

Hi, we're sending this friendly reminder because we haven't heard back from you in 30 days. We need more information about this issue to help address it. Please be sure to give us your input. If we don't hear back from you within 7 days of this comment, the issue will be automatically closed. Thank you!

github-actions[bot] avatar Jun 21 '24 21:06 github-actions[bot]