How do I implement a workflow process that requires user input?
First of all thanks for this lightweight excellent work. Now I have developed a workflow for text2sql based on the provided nodes and flows. Considering that the user's query request may not provide all the information, I have added a node for LLM to seek secondary input from the user as additional information if it finds the query request unclear. Code like the following
def exec(self,preq_res):
natural_query = preq_res
prompt = f"""You are an sql expert, your task is to help users improve their sql query questions. You should consider aspects such as the ambiguity of the question's meaning, the incompleteness of the question, and the ambiguity of the question, and summarize the information that users need to supplement.
Please respond in dictionary form, no additional content needs to be added:
If you think the question is clear enough, respond with {{"T":""}}
If you think the question is not clear enough, respond with {{"F": "Information the user needs to supplement:"}}
This is the question input by the user: {natural_query}.
"""
llm_response = call_llm(prompt)
# print(llm_response)
# response = dict(llm_response)
import json
response = json.loads(llm_response)
if list(response.keys())[0]=='F':
sup = input(response['F']+'\n')
return sup
elif list(response.keys())[0]=='T':
return ''
I was able to accomplish this when using terminal by using python's input, but with gradio this doesn't seem as easy. I'd appreciate it if anyone knows what to do.
I'm not familiar with Gradio, but I assume that it's similar to streamlit, and you usually want to break down the flow into multiple subflow. You have some flag to show the user input text, and after user submit you trigger the next subflow.
https://claude.ai/share/2183d6a1-d8f5-4d95-89b0-3ac2e2d724b5
Maybe I will add a streamlit cookbook later
Thank you for your rapidly reply! I think this is useful because currently I have a similar idea of splitting the workflow into two parts. Wish you all the best!
Thank you for your reminder about streamlit
Reference project https://github.com/yigit353/LangGraph-FastAPI-Streamlit, I used FastAPI to control the process and gradually executed the sub workflow divided into three parts.
I have finally implemented a human in loop text2sql workflow that can receive additional information and feedback from users. Thank you for your open source!
Best wish
In my Pocket Flow fork, I'm doing the following:
- Implemented
UserInputNodeandRespondToUserNodewhich requests user input or sends an output to user , respectively, throughshared["io"]injected as a dependency when running theflow. - Implemented
CLIIOandAPIIOwhich implements a commonIOInterfaceto avoid defining different workflows regardless of the interface. - The agent runner ingests an instance of
CLIIOwhen testing the flow on terminal and an instance ofAPIIOwhen running it as a part of the web API.
Simplified versions of these classes are as follows:
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
import time
from threading import Lock
from typing import Callable
from .io_backend import APIIO
class Session:
def __init__(self):
self.input_q = Queue()
self.output_q = Queue()
self.last_active = time.time()
class SessionManager:
def __init__(
self,
flow_factory: Callable,
max_workers: int = 16,
):
"""
flow_factory: () -> Flow
"""
self.flow_factory = flow_factory
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.sessions: dict[str, Session] = {}
self.lock = Lock() # to protect sessions dict
def ensure_session(self, session_id: str) -> Session:
with self.lock:
session = self.sessions.get(session_id, Session())
session.last_active = time.time()
self.sessions[session_id] = session
self.executor.submit(self._run_flow, session_id, session)
return self.sessions[session_id]
def _run_flow(self, session_id: str, session: Session):
flow = self.flow_factory()
io = APIIO(session, session_id, self.store)
flow.run(shared={"io": io})
def ask(self, session_id: str, user_input: str, timeout=60) -> str:
session = self.ensure_session(session_id)
session.input_q.put(user_input)
return session.output_q.get(timeout=timeout)
def clear(self, session_id):
with self.lock:
if session_id in self.sessions:
del self.sessions[session_id]
def shutdown(self):
print("Shutting down session manager...")
self.executor.shutdown(wait=False)
print("Session manager shutdown complete")
class IOInterface(ABC):
@abstractmethod
def get_user_input(self, prompt: str) -> str:
pass
@abstractmethod
def respond_to_user(self, message: str):
pass
class CLIIO(IOInterface):
def get_user_input(self, prompt: str | None = None) -> str:
if prompt is not None:
print(f"🧠 {prompt}")
return input("> ")
def respond_to_user(self, message: str):
print(f"🗣️ {message}")
class APIIO(IOInterface):
def __init__(self, session, session_id: str):
self.session = session
self.session_id = session_id
def get_user_input(self, prompt: str) -> str:
user_input = self.session.input_q.get()
return user_input
def respond_to_user(self, message: str):
self.session.output_q.put(message)
The benefit of this architecture is, I'll be able to implement different IO interfaces as required for different platforms without touching the graph of flow.
I'm planning to open-source my fork soon, but I'm experimenting with some other ideas and thus it's not clean code, yet.