hayhooks icon indicating copy to clipboard operation
hayhooks copied to clipboard

Streaming Callback within a pipeline

Open hoschmieder opened this issue 7 months ago • 1 comments

Hi team, i believe i am not the only one whe needs streaming_callback from multiple components within a pipeline. Today there ist the streaming_generator function wich loops all streaming components but only register the last one. I have added a streaming_generator_all function wich registers each streaming compoenten.

i have attached a test-pipline where you can see the process. (e.g. with OpenWeb-UI)

Maybe this could be a new feature for the next release?

Regards Holger

new function in /hayhooks/server/pipelines/utils.py

def streaming_generator_all(pipeline: Pipeline, pipeline_run_args: Dict) -> Generator: queue: Queue[str] = Queue()

    def streaming_callback(chunk):
        queue.put(chunk.content)

    pipeline_run_args = pipeline_run_args.copy()

    for name, component in pipeline.walk():
        if hasattr(component, "streaming_callback"):
            if name not in pipeline_run_args:
                pipeline_run_args[name] = {}
            pipeline_run_args[name]["streaming_callback"] = streaming_callback

    def run_pipeline():
        try:
            pipeline.run(pipeline_run_args)
        finally:
            queue.put(None)

    thread = threading.Thread(target=run_pipeline)
    thread.start()

    while True:
        chunk = queue.get()
        if chunk is None:
            break
        yield chunk

    thread.join()

Test-Pipeline

`from pathlib import Path
import logging
import time
from typing import Type, Dict, Any, Optional, List,Union, Generator, Callable
from haystack import Pipeline, component
from hayhooks import BasePipelineWrapper, get_last_user_message
from hayhooks import streaming_generator_all as streaming_generator #Modifikation von HS
from haystack.core.serialization import DeserializationCallbacks
from haystack.dataclasses.document import Document
from haystack.dataclasses.chat_message import ChatMessage
from haystack.dataclasses import StreamingChunk



@component
class EchoA:    
    def __init__(self):
        self.streaming_callback: Optional[Callable[[StreamingChunk], None]] = None

    @component.output_types(output=str)
    def run(self, text:str, streaming_callback: Optional[Callable[[StreamingChunk], None]] = None):
        for i in range(3):
            time.sleep(0.5)
            print(f"++++++ Streamin-Callback von EchoA",streaming_callback)
            if streaming_callback:
                print(f"************* Schritt (1) {i+1}: {text.lower()}\n")
                streaming_callback(StreamingChunk(content=f"đź§  Schritt (1) {i+1}: {text.lower()}\n"))

        final_answer = f"âś… Antwort von Echo1 auf '{text}': {text.upper()}"
        if streaming_callback:            
            streaming_callback(StreamingChunk(content=final_answer + "\n"))
        meta = [{
            "index": 0,
            "model": "test_callback",
            "finish_reason": "stop"
#            "finish_reason": "content_filter"
        }]

        output= {
                    "query": text,
                    "replies": [final_answer],
                    "meta": meta,                    
                }
        print("echo",output)
        return {"output": text}
        

@component
class EchoB:    
    def __init__(self):
        self.streaming_callback: Optional[Callable[[StreamingChunk], None]] = None

    @component.output_types(output=dict)
    def run(self, text:str, streaming_callback: Optional[Callable[[StreamingChunk], None]] = None):
        print("IN echo2",text)
        for i in range(3):
            time.sleep(0.5)
            print(f"++++++ Streamin-Callback von EchoB",streaming_callback)
            if streaming_callback:
                print(f"************* Schritt (2) {i+1}: {text.lower()}")
                streaming_callback(StreamingChunk(content=f"đź§  Schritt (2) {i+1}: {text.lower()}\n"))

        final_answer = f"âś… Antwort ECHO2 auf  '{text}': {text.lower()}"
        if streaming_callback:
            streaming_callback(StreamingChunk(content=final_answer + "\n"))
        meta = [{
            "index": 0,
            "model": "test_callback",
            "finish_reason": "stop"
        }]
        output= {
                    "query": text,
                    "replies": [final_answer],
                    "meta": meta,                    
                }
        print("echo2",output)
        return output  



# đź§© PipelineWrapper fĂĽr Hayhooks/OpenWeb-UI
class PipelineWrapper(BasePipelineWrapper):
    def setup(self) -> None:
        pipeline_yaml = (Path(__file__).parent / "pipeline.yml").read_text()
        callbacks = DeserializationCallbacks(component_pre_init=PipelineWrapper.component_pre_init_callback)
        self.pipeline = Pipeline.loads(pipeline_yaml, callbacks=callbacks)

    @staticmethod
    def component_pre_init_callback(component_name: str, component_cls: Type, init_params: Dict[str, Any]):
        custom_components = {
            "echo1": EchoA,
            "echo2": EchoB,
        }
        print ("SSSSSSSSSSSSSSSSSSSSSSSSSSSS component_pre_init_callback",EchoA)
        if component_cls.__name__ in custom_components:
            return custom_components[component_cls.__name__](**init_params)
        return component_cls(**init_params)

    def run_api(self, query: str) -> dict:
        result = self.pipeline.run({"text": query})
        return result["output"]

    def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Union[str, Generator]:
        question = get_last_user_message(messages)
        stream = body.get("stream", True)

        return streaming_generator(
            pipeline=self.pipeline,
            pipeline_run_args={
                "echo1": {"text": question} ,
#                "echo2": {"text": question}                
            }
        )

#yml-file
components:
  echo1:
    type: pipeline_wrapper.EchoA

  echo2:
    type: pipeline_wrapper.EchoB

connections:
  - sender: echo1.output
    receiver: echo2

outputs:
  - receiver: echo1.output
  - receiver: echo2.output
`

hoschmieder avatar May 06 '25 00:05 hoschmieder

Hi @hoschmieder! I think your use case makes sense, we are internally taking a look at it, so stay tuned!

cc @sjrl @tstadel

mpangrazzi avatar May 06 '25 07:05 mpangrazzi

Hi Team, do you have any news regarding this request?

hoschmieder avatar Jun 18 '25 23:06 hoschmieder

Hello all, for everyone who also needs the streaming from each componenten in the pipeline. Here is my patch for 9.1. Than you need to load in the wrapper:

from hayhooks import BasePipelineWrapper, get_last_user_message, streaming_generator_all as streaming_generator
--- __init__.py 2025-06-26 04:59:02.823443591 +0000
+++ __init__.py 2025-06-26 06:12:43.559461475 +0000
@@ -5,6 +5,7 @@
     is_user_message,
     get_last_user_message,
     streaming_generator,
+    streaming_generator_all,
     async_streaming_generator
 )

@@ -14,6 +15,7 @@
     "is_user_message",
     "get_last_user_message",
     "streaming_generator",
+    "streaming_generator_all",
     "async_streaming_generator",
     "create_app",
 ]
--- server/pipelines/utils.py   2025-06-26 04:59:02.830443476 +0000
+++ server/pipelines/utils.py   2025-06-26 06:12:17.736121433 +0000
@@ -175,3 +175,39 @@
             except Exception as e:
                 log.warning(f"Error during pipeline task cleanup: {e}")
                 raise e
+
+
+
+# by HS: register streaming for all components
+def streaming_generator_all(pipeline: Pipeline, pipeline_run_args: Dict) -> Generator:
+    queue: Queue[str] = Queue()
+
+    def streaming_callback(chunk):
+        queue.put(chunk.content)
+
+    pipeline_run_args = pipeline_run_args.copy()
+
+    # NEU: Alle Komponenten mit streaming_callback erfassen
+    for name, component in pipeline.walk():
+        if hasattr(component, "streaming_callback"):
+            if name not in pipeline_run_args:
+                pipeline_run_args[name] = {}
+            pipeline_run_args[name]["streaming_callback"] = streaming_callback
+
+    def run_pipeline():
+        try:
+            pipeline.run(pipeline_run_args)
+        finally:
+            queue.put(None)
+
+    thread = threading.Thread(target=run_pipeline)
+    thread.start()
+
+    while True:
+        chunk = queue.get()
+        if chunk is None:
+            break
+        yield chunk
+
+    thread.join()
+

hoschmieder avatar Jun 26 '25 06:06 hoschmieder

@hoschmieder Hi! Would you like to open a pull request? We will be happy to review it!

mpangrazzi avatar Jun 26 '25 08:06 mpangrazzi

@mpangrazzi can this be closed now with your recent PR https://github.com/deepset-ai/hayhooks/pull/178 ?

sjrl avatar Oct 24 '25 08:10 sjrl

@sjrl yep, we can close this! Thanks!

mpangrazzi avatar Oct 24 '25 08:10 mpangrazzi

i have teset it, it works! Thank's a lot!

Mit freundlichen GrĂĽĂźen aus Pliezhausen

[cid:schmieder_indasys_logo_2dfc8044-cf7a-411a-ac66-31181b6f96b1.png]

    Holger

Schmieder [cid:b65cc36d-2ae7-4898-becc-97c4ade45488.png]

Carl-Zeiss-Straße 5, 72124 Pliezhausen Geschäftsführer Geschäftsleitung

Telefon: +49 7127 988 110tel:+49%207127%20988%20110 E-Mail: @.@.> [cid:teams_chat_23d22538-8b61-4a8b-991a-8d90f5501e39.png] @.***>

www.schmieder.dehttps://www.schmieder.de/ | Schmieder Newsroomhttps://www.schmieder.de/schmieder-newsroom/ | Funkspruch Newsletterhttps://www.schmieder.de/funkspruch/ | Ticket @.***> | Schmieder TeamViewerhttps://www.schmieder.de/tv

Geschäftsführer: Holger Schmieder, Victor Ferreira ? Handelsregister: HRB 353858 Amtsgericht Stuttgart ? USt-IdNr. DE194533175 Diese Email enthält vertrauliche und/oder rechtlich geschützte Informationen. Sollten Sie nicht der richtige Adressat sein oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender (07127 988-0 oder @.***) und vernichten diese Email. Die unbefugte Weiterleitung, die Nutzung der Inhalte und das unerlaubte Kopieren dieser E-Mail sind untersagt. Die Kommunikation per E-Mail ist nicht gegen den Zugriff durch Dritte geschützt. Daher ist nur die von uns unterzeichnete, schriftliche Fassung von Aussagen verbindlich. Wir haften ausdrücklich nicht für den Inhalt und die Vollständigkeit von E-Mails und den gegebenenfalls daraus entstehenden Schäden. Sollte trotz unserer Virenschutzprogramme durch diese E-Mail ein Virus in Ihr System gelangen, so haften wir, soweit gesetzlich zulässig, nicht für die hieraus entstehenden Schäden.

FĂĽr Ihre Sicherheit und die Nachvollziehbarkeit des Mailversands: diese E-Mail wurde mit der ID @.>" vom Postfach @." zum Zeitpunkt "2025/10/24 13:27" versandt an @.***"


Von: Michele Pangrazzi @.> Gesendet: Freitag, 24. Oktober 2025 10:40 An: deepset-ai/hayhooks @.> Cc: Holger Schmieder @.>; Mention @.> Betreff: Re: [deepset-ai/hayhooks] Streaming Callback within a pipeline (Issue #113)

[https://avatars.githubusercontent.com/u/1433753?s=20&v=4]mpangrazzi left a comment (deepset-ai/hayhooks#113)https://github.com/deepset-ai/hayhooks/issues/113#issuecomment-3441853911

@sjrlhttps://github.com/sjrl yep, we can close this! Thanks!

— Reply to this email directly, view it on GitHubhttps://github.com/deepset-ai/hayhooks/issues/113#issuecomment-3441853911, or unsubscribehttps://github.com/notifications/unsubscribe-auth/ADH3FNTYYG64TNJHNOGOZYL3ZHQXNAVCNFSM6AAAAAB4P4NORWVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZTINBRHA2TGOJRGE. You are receiving this because you were mentioned.Message ID: @.***>

hoschmieder avatar Oct 24 '25 11:10 hoschmieder