haystack
haystack copied to clipboard
Allow Pipelines to be run/reused in "SuperPipelines"
Hi -- I have been using haystack to build out some complicated RAG pipelines. They are too complicated to build in a single Pipeline. I would like to be able to "compose" sub-pipelines together. This will allow for building complex pipelines from smaller ones, and would also allow for reuse of smaller pipelines in various ways.
Here is a trivial example of what I'd like to be able to do. In a real use case the subpipelines p1, p2 would course be larger and more complicated, and do something useful!
from haystack import Pipeline
from haystack.components.others.pipeline import PipelineComponent
from haystack.components.converters import OutputAdapter
p1 = Pipeline()
p1.add_component("adap", OutputAdapter(
template="Hello {{inp}}", output_type=str))
p2 = Pipeline()
p2.add_component("adap", OutputAdapter(
template="Goodbye {{inp}}", output_type=str))
p = Pipeline()
p.add_component("pipeline1", PipelineComponent(p1))
p.add_component("pipeline2", PipelineComponent(p2))
p.connect("pipeline1.adap:output", "pipeline2.adap:inp")
print(p.run(data={"pipeline1": {"adap:inp": "Paris"}}))
Notes:
- The
PipelineComponent
idea is from a discord discussion with @masci -- Here is his [POC branch: ] (https://github.com/deepset-ai/haystack/commit/8e455f60d0a24a31959f12b2aadca2604ba3f2a6) - The branch doesn't run anymore for reasons discussed in the discord.
- The naming used
(pipeline.adap:output)
to address the hierarchy of data is just one idea, proposed by @masci, but seems a reasonable choice
Alternatives Considered:
- Tried just making bigger and bigger pipelines. They become unwieldy and difficult to test.
- Tried creating my own
PipelineComponent
, but it becomes hard to move the data around, because each level of Pipelines in the hierarchy adds a{'data': {...}}
wrapper. Soon my brain melts.
Additional context
Some amazing things this could enable: What if we had a ParallelPipelineComponent
that can run
multiple copies of the same Pipeline in parallel using a ThreadPoolExecutor
or using Dask/Ray/something!
It would be fairly easy to do I think once we had PipelineComponent.
@vblagoje for visibility
I did an implementation for the ParallelPipelineComponent some time ago. Maybe you can reuse something here.
https://github.com/Redna/haystack-extensions/blob/main/src/haystack_extensions/components/concurrent_runner/runner.py
@component
class ConcurrentPipelineRunner:
"""
This component allows you to run multiple pipelines concurrently in a thread pool.
"""
def __init__(self, named_pipelines: List[NamedPipeline], executor: Optional[ThreadPoolExecutor | None] = None):
if type(named_pipelines) != list or any(
[type(named_pipeline) != NamedPipeline for named_pipeline in named_pipelines]
):
raise ValueError("named_pipelines must be a list of NamedPipeline instances")
names = [named_pipeline.name for named_pipeline in named_pipelines]
if len(names) != len(set(names)):
raise ValueError("All components must have unique names")
for named_pipeline in named_pipelines:
component.set_input_type(self, named_pipeline.name, {named_pipeline.name: Dict[str, Any]})
output_types = {}
for named_pipeline in named_pipelines:
output_types[named_pipeline.name] = Dict[str, Any]
self.pipelines = named_pipelines
self.executor = executor
def run(self, **inputs):
if self.executor is None:
with ThreadPoolExecutor() as executor:
final_results = self._run_in_executor(executor, inputs)
else:
final_results = self._run_in_executor(self.executor, inputs)
return {named_pipeline.name: result for named_pipeline, result in zip(self.pipelines, final_results)}
def _run_in_executor(self, executor: ThreadPoolExecutor, inputs: Dict[str, Any]):
results = executor.map(lambda c: c[0].pipeline.run(data=inputs[c[1]]), zip(self.pipelines, inputs.keys()))
return [result for result in results]
https://github.com/Redna/haystack-extensions/blob/main/tests/test_runner.py
def test_concurrent_pipeline_runner(self):
component_call_stack = []
def callback(component):
component_call_stack.append(component)
simple_component_1 = SimpleComponent(wait_time=0.09, callback=callback)
pipeline1 = Pipeline()
pipeline1.add_component("simple_component", simple_component_1)
simple_component_2 = SimpleComponent(wait_time=0.02, callback=callback)
pipeline2 = Pipeline()
pipeline2.add_component("simple_component", simple_component_2)
concurrent_pipeline_runner = ConcurrentPipelineRunner(
[NamedPipeline("pipeline1", pipeline1), NamedPipeline("pipeline2", pipeline2)]
)
overall_pipeline = Pipeline()
overall_pipeline.add_component("concurrent_pipeline_runner", concurrent_pipeline_runner)
results = overall_pipeline.run(
data={
"concurrent_pipeline_runner": {
"pipeline1": {"simple_component": {"increment": 1}},
"pipeline2": {"simple_component": {"increment": 2, "number": 10}},
}
}
)
assert results == {
'concurrent_pipeline_runner': {
'pipeline1': {'simple_component': {'number': 6}},
'pipeline2': {'simple_component': {'number': 12}},
}
}
assert len(component_call_stack) == 2
assert component_call_stack[0] == simple_component_2
assert component_call_stack[1] == simple_component_1
thanks @Redna -- it's an interesting solution. One of the problems I was struggling with is addressed here -- the setting of input and output types. Another issue though that I still struggle with -- is whether the data management part is getting quite complicated. The fact that the output looks like this:
assert results == {
'concurrent_pipeline_runner': {
'pipeline1': {'simple_component': {'number': 6}},
'pipeline2': {'simple_component': {'number': 12}},
}
}
Someone trying to get results from these many pipelines has to have a lot of knowledge about the internals of the pipelines. Do you know of any guidance on a "good" way to build reusable pipelines with Haystack? Reusable in the sense that they are flexible, but where the user of the reusable pipeline doesn't have to know all the details of the internal components of the pipeline in order to run it.
Hey @mikebellerU and @Redna , before jumping onto some of these great ideas about pipeline executors, let's focus on making https://github.com/deepset-ai/haystack/compare/massi/pipeline-component actually work, the main "issue" remaining to be (de)serialization of these super components so they can be saved/loaded/reused/shared perhaps not only by yourself but within community. What do you think about that? I'm working on some other items right now but would love to contribute in the coming weeks.
I tried playing around with the @masci component (I tweaked it so it would work at least for my case). And here is what I learned: Quickly it all gets too hard to manage the levels of data input and output. To invoke the "parent" pipeline, you may have to understand the detailed 'run' signature of the"child" pipeline.
Right now, to invoke a typical Haystack 2.0 RAG pipeline, I have to write something like response = pipeline.run(data={'retriever':... , 'embedder': .... , 'llm':....})
, and then when it returns I have to pick out response['answer_builder']['answers'][0].data
to get the result I'm interested in. Wouldn't it be better if there was a way I could encapsulate the knowledge about running this pipeline into a runner
method (or some other name), with a signature like: answer = rag_pipeline.runner(query=..,docstore=...)
? This method could live alongside run potentially as a wrapper for it, but would allow for a type-and-parameter-checked reusable pipeline, that abstracts its internal details.
TLDR: Solving composability of pipelines I think needs some thought about how to abstract away the underlying details, and that should factor into the design of PipelineRunner
.
We're rolling out this feature as part of the "experimental" package, you can follow this PR https://github.com/deepset-ai/haystack-experimental/pull/9
We're rolling out this feature as part of the "experimental" package, you can follow this PR deepset-ai/haystack-experimental#9
Hi @masci. Thanks I will check it out. -- is haystack-experimental intended to be package that I install separately from haystack so that I can access additional components?
Hi @masci. Thanks I will check it out. -- is haystack-experimental intended to be package that I install separately from haystack so that I can access additional components?
It is a separated package, but the idea is to make it a dependency of haystack-ai
, so users can access experimental features by just changing import paths.
After an internal sync we decided not to pursue this component further. The solution doesn't feel quite right and the use cases that would benefit the most (like agentic pipelines) still have unclear requirements.
Currently the problem can be addressed by breaking down pipelines into smaller ones when possible, or considering to have bigger components.
If anyone from the community wants to take this on, this could make a good integration.
Closing this issue for now.