crewAI icon indicating copy to clipboard operation
crewAI copied to clipboard

[QUESTION] How to design asynchronous human-in-the-loop Crews running on the backend?

Open olokshyn opened this issue 10 months ago • 8 comments

Feature Area

Core functionality

Is your feature request related to a an existing bug? Please link it here.

Human in the loop with CrewAI: https://github.com/crewAIInc/crewAI/issues/258 (closed) Related forum

Describe the solution you'd like

I'd appreciate your help with designing a production human-in-the-loop system. I don't think it's covered by the existing documentation.

My use-case is pretty generic:

  1. a crew is doing a mission-critical work, so it must take input from a human while executing the task.
  2. If a human says that's no good, the crew must re-do the current and potentially previous tasks, considering the human feedback.
  3. This iterative process shall repeat until the human approves the output of the task.

I know people suggested implementing the "ask human tool" and a dedicated agent that can use this tool to get the human input.

However, this is not sufficient once we consider how the crew is deployed: the crew is running on a backend in a background task, with the frontend connected through a web socket. It can also be running within the HTTP request processing context, makes little difference. Once the crew decides to ask for the human input, it must:

  1. save the current state/context to a persistent storage like a database, so it can continue in case the crew dies before the user can provide the input
  2. yield control to the calling process so it can send the user prompt over the web socket
  3. wait for the user to provide input without timing out or dying
  4. restore the crew context if needed
  5. continue the crew based on the human input.

So this setup raises some questions:

  1. How do I make sure the "ask human" tool is called every time the task produces a result?
  2. How do I make sure the tasks are re-run if the "ask human" tool requests changes, as many times as needed?
  3. How do I make the "ask human" tool to pause the crew and yield control to the calling process?
  4. How do I save and restore the context of a crew?

I see that the human_input=True flag of a Task can solve the first two questions, but looks like it's limited to the stdin inputs. Alternatively, I could abuse(?) the task guardrail mechanism to as user confirmation after the task is finished and return validation error if the user requests changes. However, I will need to do an additional LLM call to know what the user said.

Memory looks promising for saving state, but won't do the trick. The agents have two types of memory that save interactions:

  1. Short-term memory. Saves agent's outputs in ChromaDB vector store, which is separate for each agent.
  2. Long-term memory. Saves the evaluation (0-10) of the agent's output (not the output itself) to a SQLite DB, one DB for all agents. Neither type of memory seems to save actual messages that led to the agent's output, that's a bit confusing to me since the agent may lose some useful details provided by the user earlier that are not detected as entities.

In conclusion, neither short-term nor long-term memory can help save the full context of all agents when the crew is interrupted by the human input. There is a bigger problem with data isolation: memories from interactions with different users will be shared across all users through the common databases used by the default storage classes. I think this is solved by using some other storage for memories.

Describe alternatives you've considered

Currently, the way I see it can be implemented is:

  1. Let the "ask human" tool just block the whole crew until the user provides input. Not ideal since now I need to run crew in a separate process, so when the "ask human" tool blocks, I can still use the socket. Async execution should help here.
  2. Provide the "ask human" with two queues, so it can send the user prompt in one and wait for the user's answer in another.
  3. In the web socket process, read/write on these queues and hope the crew process doesn't die meanwhile, losing the context.
  4. Probably use a Flow with the first crew talking to the human and producing results, then Python code verifying the human input was taken into account, then another crew acting on the verified results of the first crew.

The quick&dirty solution is just to patch the CrewAgentExecutorMixin._ask_human_input on the CrewAgentExecutor class in the agent.py module:

from functools import partial

import crewai
from crewai.agents.crew_agent_executor import CrewAgentExecutor


def replace_class(new_class=None, *, class_module, class_name):
    if new_class is None:
        return partial(replace_class, class_module=class_module, class_name=class_name)
    original_class = class_module.__dict__[class_name]
    class_module.__dict__[class_name] = new_class
    assert original_class in new_class.__bases__
    new_class.replaces_class = original_class
    return new_class


@replace_class(class_module=crewai.agent, class_name="CrewAgentExecutor")
class CustomCrewAgentExecutor(CrewAgentExecutor):
    is_custom = True

    def _ask_human_input(self, final_answer: str) -> str:
        # send final_answer to the frontend somehow and wait for the user feedback
        user_feedback = ...
        return user_feedback

Obviously, it ignores saving/restoring the state for all agents.

Additional context

No response

Willingness to Contribute

Yes, I'd be happy to submit a pull request

olokshyn avatar Feb 06 '25 23:02 olokshyn

Following

Vidit-Ostwal avatar Feb 20 '25 14:02 Vidit-Ostwal

I ended up solving it using the guardrail mechanism. It works like a charm. @joaomdmoura do you think I should create a PR to add this into crewAI?

from typing import Any, Callable, Tuple
import logging

from crewai import Agent
from crewai.crew import CrewOutput
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from pydantic import BaseModel


# Inspired by crewai.agents.crew_agent_executor:_handle_human_feedback
class HumanVerificationGuardrail[T: BaseModel]:
    def __init__(
        self,
        agent: Agent,
        ask_human_fn: Callable[[T], tuple[str, bool]],
        guardrails: list[Callable[[CrewOutput], Tuple[bool, Any]]] = [],
        max_llm_retry: int = 3,
    ):
        self.agent = agent
        self.ask_human_fn = ask_human_fn
        self.guardrails = guardrails
        self.max_llm_retry = max_llm_retry

    def _convert_output(self, output: CrewOutput) -> T:
        # override this to enrich the crew's output before presenting it to the user
        return output.pydantic

    def __call__(self, output: CrewOutput) -> Tuple[bool, Any]:
        for guardrail in self.guardrails:
            validated, error_msg = guardrail(output)
            if not validated:
                return False, error_msg

        agent_executor: CrewAgentExecutor | None = self.agent.agent_executor
        if agent_executor is None:
            raise RuntimeError("Agent executor is not set. Is the task running?")
        if not isinstance(agent_executor, CrewAgentExecutor):
            raise RuntimeError("Agent executor is not a CrewAgentExecutor.")

        result = self._convert_output(output)

        human_feedback, is_approved = self.ask_human_fn(result)
        if is_approved:
            return (
                True,
                {},
            )  # Do not return str or TaskOutput - it overwrites the task output
        if not human_feedback:
            raise RuntimeError("Human feedback was not provided.")

        feedback_cls_prompt = agent_executor._i18n.slice(
            "human_feedback_classification"
        ).format(feedback=human_feedback)

        retry_count = 0
        additional_changes_response = None

        while retry_count < self.max_llm_retry and additional_changes_response is None:
            try:
                additional_changes_response = (
                    agent_executor.llm.call(
                        [
                            agent_executor._format_msg(
                                feedback_cls_prompt, role="system"
                            )
                        ],
                        callbacks=agent_executor.callbacks,
                    )
                    .strip()
                    .lower()
                )
            except Exception as e:
                retry_count += 1
                logging.error(
                    f"Error during LLM call to classify human feedback: {e}. "
                    f"Retrying... ({retry_count}/{self.max_llm_retry})"
                )

        if additional_changes_response is None:
            raise RuntimeError("Error processing feedback after multiple attempts.")

        if additional_changes_response == "false":
            return (
                True,
                {},
            )  # Do not return str or TaskOutput - it overwrites the task output
        elif additional_changes_response == "true":
            return False, f"Your highest priority is to fix this problem: {human_feedback}"

        else:
            raise RuntimeError(
                f"Unexpected response from LLM: '{additional_changes_response}'."
            )

Usage example:

class CustomData(BaseModel):
    field1: str = Field(
        description=(
            "What the field is about and how to use it"
        )
    )
    

class CustomVerificationGuardrail(
    HumanVerificationGuardrail[CustomData]
):
    pass


agent = Agent(...)


def ask_human_for_verification(data: CustomData) -> tuple[str, bool]:
    # ask human. approved == True saves one call to LLM
    feedback, approved = ...
    return feedback, approved


task = Task(
    description=(
        "some description"
    ),
    expected_output=pydantic_to_expected_output(CustomData),
    output_pydantic=CustomData,
    agent=agent,
    guardrail=CustomVerificationGuardrail(agent=agent, ask_human_fn=ask_human_for_verification),
)

olokshyn avatar Feb 25 '25 13:02 olokshyn

This issue is stale because it has been open for 30 days with no activity. Remove stale label or comment or this will be closed in 5 days.

github-actions[bot] avatar Mar 28 '25 12:03 github-actions[bot]

Commenting to keep this active

Vidit-Ostwal avatar Mar 28 '25 13:03 Vidit-Ostwal

Commenting to keep this active

evandroguedes-belvo avatar Apr 11 '25 18:04 evandroguedes-belvo

@olokshyn lovely your issue - very detailed and clear. I would love this feature, please PR that - we can discuss there! Does your sugggestion the final code? I was missing some solution to handle memory; it would be more connected with a Task by using the same agent, agent_exector...

anyway, looking forward to review and test it! Tks!

lucasgomide avatar Apr 11 '25 20:04 lucasgomide

commenting to keep this alive

omprakashnarayan avatar Apr 15 '25 10:04 omprakashnarayan

commenting to keep this alive

wj21067855 avatar Apr 28 '25 12:04 wj21067855

Commenting to keep this alive

AdityaVernekar avatar May 21 '25 15:05 AdityaVernekar

I'm facing exactly the same issue for my Streamlit application integrating CrewAI. I'm developing an interactive chatbot where agents need to request additional information from users during their reasoning process.

Currently, I have to implement complex workarounds:

  • Creating a custom callback system to detect when an agent wants to ask a question
  • Manually interrupting the crew's execution flow
  • Storing the conversation state in Streamlit's st.session_state
  • Re-creating a new Crew with updated context when the user responds
  • Simulating continuity by passing the conversation history as context
  • These approaches are fragile and require significant custom code that bypasses CrewAI's natural execution flow.

A native solution for asynchronous human-AI interactions would be extremely valuable, as it would enable the creation of much more robust and maintainable interactive applications.

In the meantime, using guardrails as a temporary solution doesn't seem like a bad idea - it could provide a more structured way to validate if an agent has all the necessary information and trigger user interactions when needed, though it's still not as seamless as a native implementation would be.

papapon avatar May 22 '25 13:05 papapon

hey folks!

On Enterprise we supports HIL interaction as you can see here. You can signup for our Free Plan on Enterprise and try it out yourself

lucasgomide avatar Jun 04 '25 14:06 lucasgomide

@lucasgomide Is it on the roadmap to open source the API-based HIL interaction workflow? Or will it only be provided via Enterprise?

kaleko avatar Jun 05 '25 17:06 kaleko

there's no plan to bring HIL API-based workflow to OSS so far

lucasgomide avatar Jun 06 '25 14:06 lucasgomide

This issue is stale because it has been open for 30 days with no activity. Remove stale label or comment or this will be closed in 5 days.

github-actions[bot] avatar Jul 07 '25 12:07 github-actions[bot]

This issue was closed because it has been stalled for 5 days with no activity.

github-actions[bot] avatar Jul 13 '25 12:07 github-actions[bot]

Curios to see how people are handling this in practice? If you want an agent to pause before running a sensitive tool, wait for a human response (even hours later), and then resume — what’s worked for you?

Are you using Slack, webhooks, polling, or just handling it manually or something else?
And how do you keep the agent from losing state?

3CBolt avatar Jul 16 '25 00:07 3CBolt

@3CBolt Try to use Enterprise, you can use the free account!

When a Human-In-Loop is required to resume a Crew/Flow processing a request is sent to a registered webhook; then you can reply saying whether the result is good enough to proceed or if the task needs to be reprocessed.

Check out details here

lucasgomide avatar Jul 16 '25 13:07 lucasgomide

commenting to keep this issue active.

Chhabii avatar Aug 13 '25 06:08 Chhabii

Commenting to keep this active

NirmalSankalana avatar Sep 26 '25 18:09 NirmalSankalana