haystack
haystack copied to clipboard
Pipeline loops fail when BranchJoiner receives multiple inputs
Pipeline loops in Haystack currently fail when the prompt_concatenator_after_observation
component (see attached pipeline graph) loops back ChatMessage
list to the main_input
BranchJoiner. The BranchJoiner fails with the following error message:
File "/Users/vblagoje/workspace/haystack/haystack/core/pipeline/pipeline.py", line 76, in _run_component
res: Dict[str, Any] = instance.run(**inputs)
File "/Users/vblagoje/workspace/haystack/haystack/components/joiners/branch.py", line 140, in run
raise ValueError(f"BranchJoiner expects only one input, but {inputs_count} were received.")
ValueError: BranchJoiner expects only one input, but 2 were received.
This issue seem to originate in the BranchJoiner receiving both the initial input and the looped back input simultaneously, violating its pre-condition of a single input.
Steps to reproduce:
import os
from typing import List, Optional, Dict, Any
import re
from haystack.dataclasses import ChatMessage
from haystack import Document, component
from haystack.components.builders import DynamicChatPromptBuilder
from haystack.components.converters import OutputAdapter
from haystack.components.routers import ConditionalRouter
from haystack.components.joiners import BranchJoiner
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.websearch import SerperDevWebSearch
from haystack import Pipeline
from haystack.utils import Secret
os.environ["OPENAI_API_KEY"] = "some-fake-key-replace-with-real-if-you-need-to-use-it"
def find_last_action(chat_messages: List[ChatMessage]):
prompt: str = chat_messages[-1].content
lines = prompt.strip().split('\n')
for line in reversed(lines):
pattern = r'Action:\s*(\w+)\[(.*?)\]'
match = re.search(pattern, line)
if match:
action_name = match.group(1)
parameter = match.group(2)
return [action_name, parameter]
return [None, None]
def concat_prompt(last_message: ChatMessage, current_prompt: List[ChatMessage], append: str):
return [ChatMessage.from_user(current_prompt[-1].content + last_message.content + append)]
search_message_template = """
Given these web search results:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Be as brief as possible, max one sentence.
Answer the question: {{search_query}}
"""
react_message_template = """
Solve a question answering task with interleaving Thought, Action, Observation steps.
Thought reasons about the current situation
Action can be:
google_search - Searches Google for the exact concept/entity (given in square brackets) and returns the results for you to use
finish - Returns the final answer (given in square brackets) and finishes the task
Observation sumarizes the Action outcome and helps in formulating the next
Thought in Thought, Action, Observation interleaving triplet of steps.
After each Observation, provide the next Thought and next Action.
Don't execute multiple steps even though you know the answer.
Only generate Thought and Action, never Observation, you'll get Observation from Action.
Follow the pattern in the example below.
Example:
###########################
Question: Which magazine was started first Arthur’s Magazine or First for Women?
Thought: I need to search Arthur’s Magazine and First for Women, and find which was started
first.
Action: google_search[When was 'Arthur’s Magazine' started?]
Observation: Arthur’s Magazine was an American literary periodical ˘
published in Philadelphia and founded in 1844. Edited by Timothy Shay Arthur, it featured work by
Edgar A. Poe, J.H. Ingraham, Sarah Josepha Hale, Thomas G. Spear, and others. In May 1846
it was merged into Godey’s Lady’s Book.
Thought: Arthur’s Magazine was started in 1844. I need to search First for Women founding date next
Action: google_search[When was 'First for Women' magazine started?]
Observation: First for Women is a woman’s magazine published by Bauer Media Group in the
USA. The magazine was started in 1989. It is based in Englewood Cliffs, New Jersey. In 2011
the circulation of the magazine was 1,310,696 copies.
Thought: First for Women was started in 1989. 1844 (Arthur’s Magazine) ¡ 1989 (First for
Women), so Arthur’s Magazine was started first.
Action: finish[Arthur’s Magazine]
############################
Let's start, the question is: {{query}}
Thought:
"""
routes = [
{
"condition": "{{'search' in tool_id_and_param[0]}}",
"output": "{{tool_id_and_param[1]}}",
"output_name": "search",
"output_type": str,
},
{
"condition": "{{'finish' in tool_id_and_param[0]}}",
"output": "{{tool_id_and_param[1]}}",
"output_name": "finish",
"output_type": str,
}
]
@component
class NoOp:
@component.output_types(output=str)
def run(self, query: str):
return {"output": query}
class FakeThoughtActionOpenAIChatGenerator(OpenAIChatGenerator):
@component.output_types(replies=List[ChatMessage])
def run(self, messages: List[ChatMessage], generation_kwargs: Optional[Dict[str, Any]] = None):
return {"replies": [ChatMessage.from_assistant("Thought: thinking\n Action: google_search[not important]\n")]}
class FakeConclusionOpenAIChatGenerator(OpenAIChatGenerator):
@component.output_types(replies=List[ChatMessage])
def run(self, messages: List[ChatMessage], generation_kwargs: Optional[Dict[str, Any]] = None):
return {"replies": [ChatMessage.from_assistant("Tower of Pisa is 55 meters tall\n")]}
class FakeSerperDevWebSearch(SerperDevWebSearch):
@component.output_types(documents=List[Document])
def run(self, query: str):
return {"documents": [Document(content="Eiffel Tower is 300 meters tall"),
Document(content="Tower of Pisa is 55 meters tall")]}
# main part
pipeline = Pipeline()
pipeline.add_component("main_input", BranchJoiner(List[ChatMessage]))
pipeline.add_component("prompt_builder", DynamicChatPromptBuilder(runtime_variables=["query"]))
pipeline.add_component("llm", FakeThoughtActionOpenAIChatGenerator(generation_kwargs={"stop": "Observation:"}))
pipeline.add_component("noop", NoOp())
# tools
pipeline.add_component("tool_extractor", OutputAdapter("{{messages | find_action}}",
output_type=List[str],
custom_filters={"find_action": find_last_action}))
pipeline.add_component("prompt_concatenator_after_action",
OutputAdapter("{{replies[-1] | concat_prompt(current_prompt,'')}}",
output_type=List[ChatMessage],
custom_filters={"concat_prompt": concat_prompt}))
pipeline.add_component("router", ConditionalRouter(routes))
pipeline.add_component("router_search",
FakeSerperDevWebSearch(api_key=Secret.from_token("some_fake_api_key")))
pipeline.add_component("search_prompt_builder",
DynamicChatPromptBuilder(runtime_variables=["documents", "search_query"]))
pipeline.add_component("search_llm", FakeConclusionOpenAIChatGenerator())
pipeline.add_component("router_finish", OutputAdapter("{{final_answer | format_final_answer}}",
output_type=str,
custom_filters={"format_final_answer": lambda x: x}))
pipeline.add_component("search_output_adapter", OutputAdapter("{{search_replies | format_observation}}",
output_type=List[ChatMessage],
custom_filters={"format_observation": lambda x: [
ChatMessage.from_assistant(
"Observation: " + x[-1].content + "\n")]}))
pipeline.add_component("prompt_concatenator_after_observation",
OutputAdapter("{{replies[-1] | concat_prompt(current_prompt, '\nThought:')}}",
output_type=List[ChatMessage],
custom_filters={"concat_prompt": concat_prompt}))
# main
pipeline.connect("main_input", "prompt_builder.prompt_source")
pipeline.connect("noop", "prompt_builder.query")
pipeline.connect("prompt_builder.prompt", "llm.messages")
pipeline.connect("llm.replies", "prompt_concatenator_after_action.replies")
# tools
pipeline.connect("prompt_builder.prompt", "prompt_concatenator_after_action.current_prompt")
pipeline.connect("prompt_concatenator_after_action", "tool_extractor.messages")
pipeline.connect("tool_extractor", "router")
pipeline.connect("router.search", "router_search.query")
pipeline.connect("router_search.documents", "search_prompt_builder.documents")
pipeline.connect("router.search", "search_prompt_builder.search_query")
pipeline.connect("search_prompt_builder.prompt", "search_llm.messages")
pipeline.connect("router.finish", "router_finish")
pipeline.connect("search_llm.replies", "search_output_adapter.search_replies")
pipeline.connect("search_output_adapter", "prompt_concatenator_after_observation.replies")
pipeline.connect("prompt_concatenator_after_action", "prompt_concatenator_after_observation.current_prompt")
pipeline.connect("prompt_concatenator_after_observation", "main_input")
search_message = [ChatMessage.from_user(search_message_template)]
messages = [ChatMessage.from_user(react_message_template)]
question = "which tower is taller: eiffel tower or tower of pisa?"
res = pipeline.run(data={"main_input": {"value": messages},
"noop": {"query": question},
"search_prompt_builder": {"prompt_source": search_message}})
print(res)
Expected behavior: The pipeline should handle loops correctly, allowing the BranchJoiner to process looped inputs sequentially rather than simultaneously.
Actual behavior: The pipeline fails when the loop feeds back to the BranchJoiner, causing it to receive multiple inputs at once raising the above mentioned exception