haystack icon indicating copy to clipboard operation
haystack copied to clipboard

Allow Pipelines to be run/reused in "SuperPipelines"

Open mikebellerU opened this issue 9 months ago • 5 comments

Hi -- I have been using haystack to build out some complicated RAG pipelines. They are too complicated to build in a single Pipeline. I would like to be able to "compose" sub-pipelines together. This will allow for building complex pipelines from smaller ones, and would also allow for reuse of smaller pipelines in various ways.

Here is a trivial example of what I'd like to be able to do. In a real use case the subpipelines p1, p2 would course be larger and more complicated, and do something useful!

from haystack import Pipeline
from haystack.components.others.pipeline import PipelineComponent
from haystack.components.converters import OutputAdapter

p1 = Pipeline()
p1.add_component("adap", OutputAdapter(
    template="Hello {{inp}}", output_type=str))
p2 = Pipeline()
p2.add_component("adap", OutputAdapter(
    template="Goodbye {{inp}}", output_type=str))
p = Pipeline()
p.add_component("pipeline1", PipelineComponent(p1))
p.add_component("pipeline2", PipelineComponent(p2))
p.connect("pipeline1.adap:output", "pipeline2.adap:inp")
print(p.run(data={"pipeline1": {"adap:inp": "Paris"}}))

Notes:

  • The PipelineComponent idea is from a discord discussion with @masci -- Here is his [POC branch: ] (https://github.com/deepset-ai/haystack/commit/8e455f60d0a24a31959f12b2aadca2604ba3f2a6)
  • The branch doesn't run anymore for reasons discussed in the discord.
  • The naming used (pipeline.adap:output) to address the hierarchy of data is just one idea, proposed by @masci, but seems a reasonable choice

Alternatives Considered:

  • Tried just making bigger and bigger pipelines. They become unwieldy and difficult to test.
  • Tried creating my own PipelineComponent, but it becomes hard to move the data around, because each level of Pipelines in the hierarchy adds a {'data': {...}} wrapper. Soon my brain melts.

Additional context

Some amazing things this could enable: What if we had a ParallelPipelineComponent that can run multiple copies of the same Pipeline in parallel using a ThreadPoolExecutor or using Dask/Ray/something! It would be fairly easy to do I think once we had PipelineComponent.

mikebellerU avatar May 02 '24 22:05 mikebellerU

@vblagoje for visibility

masci avatar May 03 '24 13:05 masci

I did an implementation for the ParallelPipelineComponent some time ago. Maybe you can reuse something here.

https://github.com/Redna/haystack-extensions/blob/main/src/haystack_extensions/components/concurrent_runner/runner.py

@component
class ConcurrentPipelineRunner:
    """
    This component allows you to run multiple pipelines concurrently in a thread pool.
    """

    def __init__(self, named_pipelines: List[NamedPipeline], executor: Optional[ThreadPoolExecutor | None] = None):
        if type(named_pipelines) != list or any(
            [type(named_pipeline) != NamedPipeline for named_pipeline in named_pipelines]
        ):
            raise ValueError("named_pipelines must be a list of NamedPipeline instances")

        names = [named_pipeline.name for named_pipeline in named_pipelines]
        if len(names) != len(set(names)):
            raise ValueError("All components must have unique names")

        for named_pipeline in named_pipelines:
            component.set_input_type(self, named_pipeline.name, {named_pipeline.name: Dict[str, Any]})

        output_types = {}
        for named_pipeline in named_pipelines:
            output_types[named_pipeline.name] = Dict[str, Any]
        self.pipelines = named_pipelines
        self.executor = executor

    def run(self, **inputs):
        if self.executor is None:
            with ThreadPoolExecutor() as executor:
                final_results = self._run_in_executor(executor, inputs)
        else:
            final_results = self._run_in_executor(self.executor, inputs)

        return {named_pipeline.name: result for named_pipeline, result in zip(self.pipelines, final_results)}

    def _run_in_executor(self, executor: ThreadPoolExecutor, inputs: Dict[str, Any]):
        results = executor.map(lambda c: c[0].pipeline.run(data=inputs[c[1]]), zip(self.pipelines, inputs.keys()))
        return [result for result in results]

https://github.com/Redna/haystack-extensions/blob/main/tests/test_runner.py

def test_concurrent_pipeline_runner(self):
        component_call_stack = []

        def callback(component):
            component_call_stack.append(component)

        simple_component_1 = SimpleComponent(wait_time=0.09, callback=callback)
        pipeline1 = Pipeline()
        pipeline1.add_component("simple_component", simple_component_1)

        simple_component_2 = SimpleComponent(wait_time=0.02, callback=callback)
        pipeline2 = Pipeline()
        pipeline2.add_component("simple_component", simple_component_2)

        concurrent_pipeline_runner = ConcurrentPipelineRunner(
            [NamedPipeline("pipeline1", pipeline1), NamedPipeline("pipeline2", pipeline2)]
        )

        overall_pipeline = Pipeline()

        overall_pipeline.add_component("concurrent_pipeline_runner", concurrent_pipeline_runner)

        results = overall_pipeline.run(
            data={
                "concurrent_pipeline_runner": {
                    "pipeline1": {"simple_component": {"increment": 1}},
                    "pipeline2": {"simple_component": {"increment": 2, "number": 10}},
                }
            }
        )

        assert results == {
            'concurrent_pipeline_runner': {
                'pipeline1': {'simple_component': {'number': 6}},
                'pipeline2': {'simple_component': {'number': 12}},
            }
        }
        assert len(component_call_stack) == 2
        assert component_call_stack[0] == simple_component_2
        assert component_call_stack[1] == simple_component_1

Redna avatar May 14 '24 08:05 Redna

thanks @Redna -- it's an interesting solution. One of the problems I was struggling with is addressed here -- the setting of input and output types. Another issue though that I still struggle with -- is whether the data management part is getting quite complicated. The fact that the output looks like this:

       assert results == {
            'concurrent_pipeline_runner': {
                'pipeline1': {'simple_component': {'number': 6}},
                'pipeline2': {'simple_component': {'number': 12}},
            }
        }

Someone trying to get results from these many pipelines has to have a lot of knowledge about the internals of the pipelines. Do you know of any guidance on a "good" way to build reusable pipelines with Haystack? Reusable in the sense that they are flexible, but where the user of the reusable pipeline doesn't have to know all the details of the internal components of the pipeline in order to run it.

mikebellerU avatar May 14 '24 20:05 mikebellerU

Hey @mikebellerU and @Redna , before jumping onto some of these great ideas about pipeline executors, let's focus on making https://github.com/deepset-ai/haystack/compare/massi/pipeline-component actually work, the main "issue" remaining to be (de)serialization of these super components so they can be saved/loaded/reused/shared perhaps not only by yourself but within community. What do you think about that? I'm working on some other items right now but would love to contribute in the coming weeks.

vblagoje avatar May 15 '24 07:05 vblagoje

I tried playing around with the @masci component (I tweaked it so it would work at least for my case). And here is what I learned: Quickly it all gets too hard to manage the levels of data input and output. To invoke the "parent" pipeline, you may have to understand the detailed 'run' signature of the"child" pipeline.

Right now, to invoke a typical Haystack 2.0 RAG pipeline, I have to write something like response = pipeline.run(data={'retriever':... , 'embedder': .... , 'llm':....}), and then when it returns I have to pick out response['answer_builder']['answers'][0].data to get the result I'm interested in. Wouldn't it be better if there was a way I could encapsulate the knowledge about running this pipeline into a runner method (or some other name), with a signature like: answer = rag_pipeline.runner(query=..,docstore=...) ? This method could live alongside run potentially as a wrapper for it, but would allow for a type-and-parameter-checked reusable pipeline, that abstracts its internal details.

TLDR: Solving composability of pipelines I think needs some thought about how to abstract away the underlying details, and that should factor into the design of PipelineRunner.

mikebellerU avatar May 15 '24 16:05 mikebellerU

We're rolling out this feature as part of the "experimental" package, you can follow this PR https://github.com/deepset-ai/haystack-experimental/pull/9

masci avatar May 28 '24 17:05 masci

We're rolling out this feature as part of the "experimental" package, you can follow this PR deepset-ai/haystack-experimental#9

Hi @masci. Thanks I will check it out. -- is haystack-experimental intended to be package that I install separately from haystack so that I can access additional components?

mikebellerU avatar May 30 '24 16:05 mikebellerU

Hi @masci. Thanks I will check it out. -- is haystack-experimental intended to be package that I install separately from haystack so that I can access additional components?

It is a separated package, but the idea is to make it a dependency of haystack-ai, so users can access experimental features by just changing import paths.

masci avatar May 30 '24 17:05 masci

After an internal sync we decided not to pursue this component further. The solution doesn't feel quite right and the use cases that would benefit the most (like agentic pipelines) still have unclear requirements.

Currently the problem can be addressed by breaking down pipelines into smaller ones when possible, or considering to have bigger components.

If anyone from the community wants to take this on, this could make a good integration.

masci avatar Jun 12 '24 16:06 masci

Closing this issue for now.

shadeMe avatar Jun 25 '24 12:06 shadeMe