PocketFlow icon indicating copy to clipboard operation
PocketFlow copied to clipboard

How do I implement a workflow process that requires user input?

Open breakices opened this issue 7 months ago • 4 comments

First of all thanks for this lightweight excellent work. Now I have developed a workflow for text2sql based on the provided nodes and flows. Considering that the user's query request may not provide all the information, I have added a node for LLM to seek secondary input from the user as additional information if it finds the query request unclear. Code like the following

    def exec(self,preq_res):
        natural_query = preq_res
        prompt = f"""You are an sql expert, your task is to help users improve their sql query questions. You should consider aspects such as the ambiguity of the question's meaning, the incompleteness of the question, and the ambiguity of the question, and summarize the information that users need to supplement.
Please respond in dictionary form, no additional content needs to be added:
If you think the question is clear enough, respond with {{"T":""}}
If you think the question is not clear enough, respond with {{"F": "Information the user needs to supplement:"}}
This is the question input by the user: {natural_query}.
"""
        llm_response = call_llm(prompt)
        # print(llm_response)
        # response = dict(llm_response)
        import json
        response = json.loads(llm_response)
        if list(response.keys())[0]=='F':
            sup = input(response['F']+'\n')
            return sup
        elif list(response.keys())[0]=='T':
            return ''

I was able to accomplish this when using terminal by using python's input, but with gradio this doesn't seem as easy. I'd appreciate it if anyone knows what to do.

breakices avatar May 13 '25 02:05 breakices

I'm not familiar with Gradio, but I assume that it's similar to streamlit, and you usually want to break down the flow into multiple subflow. You have some flag to show the user input text, and after user submit you trigger the next subflow.

https://claude.ai/share/2183d6a1-d8f5-4d95-89b0-3ac2e2d724b5

Maybe I will add a streamlit cookbook later

zachary62 avatar May 13 '25 03:05 zachary62

Thank you for your rapidly reply! I think this is useful because currently I have a similar idea of splitting the workflow into two parts. Wish you all the best!

breakices avatar May 13 '25 03:05 breakices

Thank you for your reminder about streamlit Reference project https://github.com/yigit353/LangGraph-FastAPI-Streamlit, I used FastAPI to control the process and gradually executed the sub workflow divided into three parts.
I have finally implemented a human in loop text2sql workflow that can receive additional information and feedback from users. Thank you for your open source! Best wish

breakices avatar May 20 '25 03:05 breakices

In my Pocket Flow fork, I'm doing the following:

  1. Implemented UserInputNode and RespondToUserNode which requests user input or sends an output to user , respectively, through shared["io"] injected as a dependency when running the flow.
  2. Implemented CLIIO and APIIO which implements a common IOInterface to avoid defining different workflows regardless of the interface.
  3. The agent runner ingests an instance of CLIIO when testing the flow on terminal and an instance of APIIO when running it as a part of the web API.

Simplified versions of these classes are as follows:

from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
import time
from threading import Lock
from typing import Callable
from .io_backend import APIIO


class Session:
    def __init__(self):
        self.input_q = Queue()
        self.output_q = Queue()
        self.last_active = time.time()


class SessionManager:
    def __init__(
        self,
        flow_factory: Callable,
        max_workers: int = 16,
    ):
        """
        flow_factory: () -> Flow
        """
        self.flow_factory = flow_factory
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.sessions: dict[str, Session] = {}
        self.lock = Lock()  # to protect sessions dict

    def ensure_session(self, session_id: str) -> Session:
        with self.lock:
            session = self.sessions.get(session_id, Session())
            session.last_active = time.time()
            self.sessions[session_id] = session
            self.executor.submit(self._run_flow, session_id, session)
        return self.sessions[session_id]

    def _run_flow(self, session_id: str, session: Session):
        flow = self.flow_factory()
        io = APIIO(session, session_id, self.store)
        flow.run(shared={"io": io})

    def ask(self, session_id: str, user_input: str, timeout=60) -> str:
        session = self.ensure_session(session_id)
        session.input_q.put(user_input)
        return session.output_q.get(timeout=timeout)

    def clear(self, session_id):
        with self.lock:
            if session_id in self.sessions:
                del self.sessions[session_id]

    def shutdown(self):
        print("Shutting down session manager...")
        self.executor.shutdown(wait=False)
        print("Session manager shutdown complete")


class IOInterface(ABC):
    @abstractmethod
    def get_user_input(self, prompt: str) -> str:
        pass

    @abstractmethod
    def respond_to_user(self, message: str):
        pass


class CLIIO(IOInterface):
    def get_user_input(self, prompt: str | None = None) -> str:
        if prompt is not None:
            print(f"🧠 {prompt}")
        return input("> ")

    def respond_to_user(self, message: str):
        print(f"🗣️ {message}")


class APIIO(IOInterface):
    def __init__(self, session, session_id: str):
        self.session = session
        self.session_id = session_id

    def get_user_input(self, prompt: str) -> str:
        user_input = self.session.input_q.get()
        return user_input

    def respond_to_user(self, message: str):
        self.session.output_q.put(message)

The benefit of this architecture is, I'll be able to implement different IO interfaces as required for different platforms without touching the graph of flow.

I'm planning to open-source my fork soon, but I'm experimenting with some other ideas and thus it's not clean code, yet.

monatis avatar May 21 '25 12:05 monatis