langflow icon indicating copy to clipboard operation
langflow copied to clipboard

Error: When trying to use a conditional router to branch a conversation

Open Nitewriter opened this issue 1 year ago • 8 comments

I'm attempting to implement a supervisor pattern where a language model classifies the current message given the conversation so far. I can get this to work perfectly within the UI by using a conditional router component, however, when I attempt to use the flow from the API it throws an error.

{
  "detail": "The message must be an iterator or an async iterator."
}

I suspect this is due to the need for two separate chat outputs in the flow to facilitate the branch in conversation. Here is an example flow that demonstrates the issue. I'm currently using the latest changes from main to test.

dont-talk-about-bruno.json

image
❯ curl -X POST \
    "http://localhost:3000/api/v1/run/dea95449-11e1-4f8f-b22d-7ef6f8728c34?stream=false" \
    -H 'Content-Type: application/json'\
    -d '{"input_value": "message",
    "output_type": "chat",
    "input_type": "chat",
    "tweaks": {
  "ChatInput-Ma4Jp": {},
  "OpenAIModel-MdB0V": {},
  "OpenAIModel-NJ8p6": {},
  "Prompt-DfMtR": {},
  "ConditionalRouter-j94T7": {},
  "Pass-N8MXP": {},
  "ChatOutput-TIge3": {},
  "ChatOutput-Z6DoM": {},
  "OpenAIModel-Naaf3": {},
  "Pass-hbQT6": {},
  "Prompt-PE0iF": {},
  "Prompt-uCIqY": {},
  "Memory-oi31T": {}
}}'

{"detail":"The message must be an iterator or an async iterator."}

Nitewriter avatar Jul 03 '24 16:07 Nitewriter

Hey there, @Nitewriter! I'm here to help you with any bugs, questions, or contributions you might have. Let's tackle this issue together!

To resolve the error "The message must be an iterator or an async iterator" when using the ConditionalRouter component in your flow via the API, you need to ensure that the INPUT_FIELD_NAME parameter is set to an iterator or an async iterator. This is necessary for the flow to handle the message correctly.

Here's how you can address this issue:

  1. Ensure INPUT_FIELD_NAME is an Iterator or Async Iterator: Make sure that the INPUT_FIELD_NAME parameter in your flow is set to an iterator or an async iterator. Here is an example of how to set this parameter correctly:

    from typing import AsyncIterator
    
    async def example_async_iterator() -> AsyncIterator[str]:
        for i in range(5):
            yield f"Message {i}"
    
    # Assuming `vertex` is an instance of the class containing the `stream` method
    vertex.params[INPUT_FIELD_NAME] = example_async_iterator()
    await vertex.stream()
    
  2. Check the Implementation of the stream Method: Ensure that the stream method in your vertex class is correctly implemented to handle the iterator or async iterator. Here is a relevant code snippet that checks this condition:

    async def stream(self):
        iterator = self.params.get(INPUT_FIELD_NAME, None)
        if not isinstance(iterator, (AsyncIterator, Iterator)):
            raise ValueError("The message must be an iterator or an async iterator.")
        ...
    
  3. Example of Handling Streaming in Langflow: In the context of Langflow, the build_vertex_stream function in src/backend/base/langflow/api/v1/chat.py demonstrates how to handle streaming for a vertex. Here is an excerpt:

    @router.get("/build/{flow_id}/{vertex_id}/stream", response_class=StreamingResponse)
    async def build_vertex_stream(
        flow_id: uuid.UUID,
        vertex_id: str,
        session_id: Optional[str] = None,
        chat_service: "ChatService" = Depends(get_chat_service),
        session_service: "SessionService" = Depends(get_session_service),
    ):
        try:
            flow_id_str = str(flow_id)
    
            async def stream_vertex():
                try:
                    cache = await chat_service.get_cache(flow_id_str)
                    if not cache:
                        raise ValueError(f"No cache found for {flow_id_str}.")
                    else:
                        graph = cache.get("result")
    
                    vertex: "InterfaceVertex" = graph.get_vertex(vertex_id)
                    if not hasattr(vertex, "stream"):
                        raise ValueError(f"Vertex {vertex_id} does not support streaming")
                    if isinstance(vertex._built_result, str) and vertex._built_result:
                        stream_data = StreamData(
                            event="message",
                            data={"message": f"Streaming vertex {vertex_id}"},
                        )
                        yield str(stream_data)
                        stream_data = StreamData(
                            event="message",
                            data={"chunk": vertex._built_result},
                        )
                        yield str(stream_data)
                    elif not vertex.frozen or not vertex._built:
                        logger.debug(f"Streaming vertex {vertex_id}")
                        stream_data = StreamData(
                            event="message",
                            data={"message": f"Streaming vertex {vertex_id}"},
                        )
                        yield str(stream_data)
                        async for chunk in vertex.stream():
                            stream_data = StreamData(
                                event="message",
                                data={"chunk": chunk},
                            )
                            yield str(stream_data)
                    elif vertex.result is not None:
                        stream_data = StreamData(
                            event="message",
                            data={"chunk": vertex._built_result},
                        )
                        yield str(stream_data)
                    else:
                        raise ValueError(f"No result found for vertex {vertex_id}")
    
                except Exception as exc:
                    logger.exception(f"Error building Component: {exc}")
                    exc_message = parse_exception(exc)
                    if exc_message == "The message must be an iterator or an async iterator.":
                        exc_message = "This stream has already been closed."
                    yield str(StreamData(event="error", data={"error": exc_message}))
                finally:
                    logger.debug("Closing stream")
                    await chat_service.set_cache(flow_id_str, graph)
                    yield str(StreamData(event="close", data={"message": "Stream closed"}))
    
            return StreamingResponse(stream_vertex(), media_type="text/event-stream")
        except Exception as exc:
            raise HTTPException(status_code=500, detail="Error building Component") from exc
    

By ensuring that the INPUT_FIELD_NAME parameter is correctly set to an iterator or async iterator and that the stream method is properly implemented, you should be able to resolve the error and successfully run the flow via the API [1][2][3][4].

To continue talking to Dosu, mention @dosu.

dosubot[bot] avatar Jul 03 '24 16:07 dosubot[bot]

Hey @Nitewriter the error The message must be an iterator or an async iterator. normally means something went wrong in a component. I believe you will find an ERROR log line in logs that refers to the actual issue

nicoloboschi avatar Jul 04 '24 07:07 nicoloboschi

@nicoloboschi I'll see what I can find and post it here. Thank you.

Nitewriter avatar Jul 05 '24 13:07 Nitewriter

@nicoloboschi Here is the Traceback I get when calling the flow with curl:

ValueError: The message must be an iterator or an async iterator. (<class 'langflow.graph.vertex.types.ComponentVertex'>)                    
                                                                                                                                                                          
                             ╭─────────────────────────────────────────── Traceback (most recent call last) ────────────────────────────────────────────╮                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/api/v1/endpoints.py:203 in simplified_run_flow       │                 
                             │                                                                                                                          │                 
                             │   200 │   """                                                                                                            │                 
                             │   201 │   start_time = time.perf_counter()                                                                               │                 
                             │   202 │   try:                                                                                                           │                 
                             │ ❱ 203 │   │   result = await simple_run_flow(                                                                            │                 
                             │   204 │   │   │   flow=flow,                                                                                             │                 
                             │   205 │   │   │   input_request=input_request,                                                                           │                 
                             │   206 │   │   │   stream=stream,                                                                                         │                 
                             │                                                                                                                          │                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/api/v1/endpoints.py:107 in simple_run_flow           │                 
                             │                                                                                                                          │                 
                             │   104 │   │   │   │   │   and (input_request.output_type == "any" or input_request.output_type                           │                 
                             │       in vertex.id.lower())  # type: ignore                                                                              │                 
                             │   105 │   │   │   │   )                                                                                                  │                 
                             │   106 │   │   │   ]                                                                                                      │                 
                             │ ❱ 107 │   │   task_result, session_id = await run_graph_internal(                                                        │                 
                             │   108 │   │   │   graph=graph,                                                                                           │                 
                             │   109 │   │   │   flow_id=flow_id_str,                                                                                   │                 
                             │   110 │   │   │   session_id=input_request.session_id,                                                                   │                 
                             │                                                                                                                          │                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/processing/process.py:48 in run_graph_internal       │                 
                             │                                                                                                                          │                 
                             │    45 │                                                                                                                  │                 
                             │    46 │   fallback_to_env_vars = get_settings_service().settings.fallback_to_env_var                                     │                 
                             │    47 │                                                                                                                  │                 
                             │ ❱  48 │   run_outputs = await graph.arun(                                                                                │                 
                             │    49 │   │   inputs=inputs_list,                                                                                        │                 
                             │    50 │   │   inputs_components=components,                                                                              │                 
                             │    51 │   │   types=types,                                                                                               │                 
                             │                                                                                                                          │                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/graph/graph/base.py:455 in arun                      │                 
                             │                                                                                                                          │                 
                             │    452 │   │   for _ in range(len(inputs) - len(types)):                                                                 │                 
                             │    453 │   │   │   types.append("chat")  # default to chat                                                               │                 
                             │    454 │   │   for run_inputs, components, input_type in zip(inputs, inputs_components, types):                          │                 
                             │ ❱  455 │   │   │   run_outputs = await self._run(                                                                        │                 
                             │    456 │   │   │   │   inputs=run_inputs,                                                                                │                 
                             │    457 │   │   │   │   input_components=components,                                                                      │                 
                             │    458 │   │   │   │   input_type=input_type,                                                                            │                 
                             │                                                                                                                          │                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/graph/graph/base.py:357 in _run                      │                 
                             │                                                                                                                          │                 
                             │    354 │   │   │   │   raise ValueError(f"Vertex {vertex_id} not found")                                                 │                 
                             │    355 │   │   │                                                                                                         │                 
                             │    356 │   │   │   if not vertex.result and not stream and hasattr(vertex,                                               │                 
                             │        "consume_async_generator"):                                                                                       │                 
                             │ ❱  357 │   │   │   │   await vertex.consume_async_generator()                                                            │                 
                             │    358 │   │   │   if (not outputs and vertex.is_output) or (vertex.display_name in outputs or                           │                 
                             │        vertex.id in outputs):                                                                                            │                 
                             │    359 │   │   │   │   vertex_outputs.append(vertex.result)                                                              │                 
                             │    360                                                                                                                   │                 
                             │                                                                                                                          │                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/graph/vertex/types.py:436 in consume_async_generator │                 
                             │                                                                                                                          │                 
                             │   433 │   │   self._built = True                                                                                         │                 
                             │   434 │                                                                                                                  │                 
                             │   435 │   async def consume_async_generator(self):                                                                       │                 
                             │ ❱ 436 │   │   async for _ in self.stream():                                                                              │                 
                             │   437 │   │   │   pass                                                                                                   │                 
                             │   438 │                                                                                                                  │                 
                             │   439 │   def _is_chat_input(self):                                                                                      │                 
                             │                                                                                                                          │                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/graph/vertex/types.py:363 in stream                  │                 
                             │                                                                                                                          │                 
                             │   360 │   async def stream(self):                                                                                        │                 
                             │   361 │   │   iterator = self.params.get(INPUT_FIELD_NAME, None)                                                         │                 
                             │   362 │   │   if not isinstance(iterator, (AsyncIterator, Iterator)):                                                    │                 
                             │ ❱ 363 │   │   │   raise ValueError(                                                                                      │                 
                             │   364 │   │   │   │   f"The message must be an iterator or an async iterator.                                            │                 
                             │       ({type(iterator)})"                                                                                                │                 
                             │   365 │   │   │   )                                                                                                      │                 
                             │   366 │   │   is_async = isinstance(iterator, AsyncIterator)                                                             │                 
                             ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯                 
                             ValueError: The message must be an iterator or an async iterator. (<class 'langflow.graph.vertex.types.ComponentVertex'>) 

I added the type of the iterator to the ValueError message so we could see what it did receive.

Nitewriter avatar Jul 05 '24 13:07 Nitewriter

@nicoloboschi Ok, I think I may have figured something out. This seems to be caused because the Conditional Router component calls stop on the false branch of the graph. The runner still wants to resolve the Chat Output component for that stopped branch of the graph and gets this error because the OpenAI component is inactive (not built). Should conversational branching be possible using the conditional router?

Nitewriter avatar Jul 05 '24 15:07 Nitewriter

Error Occurs on the About Bruno chat output component's input field

❯ curl -X POST \                                                                                                                                                                                       
    "http://localhost:3000/api/v1/run/dea95449-11e1-4f8f-b22d-7ef6f8728c34?stream=false" \
    -H 'Content-Type: application/json'\
    -d '{"input_value": "What is it like being an AI?",
    "output_type": "chat",
    "input_type": "chat",
    "tweaks": {         
  "ChatInput-Ma4Jp": {},
  "Prompt-DfMtR": {},           
  "ConditionalRouter-j94T7": {},
  "Pass-N8MXP": {},      
  "ChatOutput-TIge3": {},
  "ChatOutput-Z6DoM": {},
  "Pass-hbQT6": {},  
  "Prompt-PE0iF": {},
  "Prompt-uCIqY": {},
  "Memory-oi31T": {},     
  "OpenAIModel-pHvfu": {},
  "OpenAIModel-w2hnb": {},
  "OpenAIModel-dUp0N": {}
}}'   

Error Occurs on the Not About Bruno chat output component's input field

❯ curl -X POST \                                                                                                                                                                                       
    "http://localhost:3000/api/v1/run/dea95449-11e1-4f8f-b22d-7ef6f8728c34?stream=false" \
    -H 'Content-Type: application/json'\
    -d '{"input_value": "Have you heard about Bruno?",
    "output_type": "chat",
    "input_type": "chat",
    "tweaks": {         
  "ChatInput-Ma4Jp": {},
  "Prompt-DfMtR": {},           
  "ConditionalRouter-j94T7": {},
  "Pass-N8MXP": {},      
  "ChatOutput-TIge3": {},
  "ChatOutput-Z6DoM": {},
  "Pass-hbQT6": {},  
  "Prompt-PE0iF": {},
  "Prompt-uCIqY": {},
  "Memory-oi31T": {},     
  "OpenAIModel-pHvfu": {},
  "OpenAIModel-w2hnb": {},
  "OpenAIModel-dUp0N": {}
}}'   

Nitewriter avatar Jul 05 '24 15:07 Nitewriter

Bailing instead of raising an error does allow the flow to work, however, it is probably not the solution you would want.

src/backend/base/langflow/graph/vertex/types.py:InterfaceVertex:stream

async def stream(self):
        iterator = self.params.get(INPUT_FIELD_NAME, None)
        if not isinstance(iterator, (AsyncIterator, Iterator)):
            return

Nitewriter avatar Jul 05 '24 15:07 Nitewriter

Tagging @ogabrielluiz . This looks like a graph builder issue, could you take a look ?

nicoloboschi avatar Jul 08 '24 08:07 nicoloboschi

@Nitewriter Hey

It's been a while since we had any interaction here.

Do you need any assistance with this case?

carlosrcoelho avatar Aug 19 '24 21:08 carlosrcoelho

Thank you for your contribution! This issue will be closed. If you have any questions or encounter another problem, please open a new issue and we will be ready to assist you.

carlosrcoelho avatar Aug 23 '24 20:08 carlosrcoelho