Streaming Callback within a pipeline
Hi team, i believe i am not the only one whe needs streaming_callback from multiple components within a pipeline. Today there ist the streaming_generator function wich loops all streaming components but only register the last one. I have added a streaming_generator_all function wich registers each streaming compoenten.
i have attached a test-pipline where you can see the process. (e.g. with OpenWeb-UI)
Maybe this could be a new feature for the next release?
Regards Holger
new function in /hayhooks/server/pipelines/utils.py
def streaming_generator_all(pipeline: Pipeline, pipeline_run_args: Dict) -> Generator: queue: Queue[str] = Queue()
def streaming_callback(chunk):
queue.put(chunk.content)
pipeline_run_args = pipeline_run_args.copy()
for name, component in pipeline.walk():
if hasattr(component, "streaming_callback"):
if name not in pipeline_run_args:
pipeline_run_args[name] = {}
pipeline_run_args[name]["streaming_callback"] = streaming_callback
def run_pipeline():
try:
pipeline.run(pipeline_run_args)
finally:
queue.put(None)
thread = threading.Thread(target=run_pipeline)
thread.start()
while True:
chunk = queue.get()
if chunk is None:
break
yield chunk
thread.join()
Test-Pipeline
`from pathlib import Path
import logging
import time
from typing import Type, Dict, Any, Optional, List,Union, Generator, Callable
from haystack import Pipeline, component
from hayhooks import BasePipelineWrapper, get_last_user_message
from hayhooks import streaming_generator_all as streaming_generator #Modifikation von HS
from haystack.core.serialization import DeserializationCallbacks
from haystack.dataclasses.document import Document
from haystack.dataclasses.chat_message import ChatMessage
from haystack.dataclasses import StreamingChunk
@component
class EchoA:
def __init__(self):
self.streaming_callback: Optional[Callable[[StreamingChunk], None]] = None
@component.output_types(output=str)
def run(self, text:str, streaming_callback: Optional[Callable[[StreamingChunk], None]] = None):
for i in range(3):
time.sleep(0.5)
print(f"++++++ Streamin-Callback von EchoA",streaming_callback)
if streaming_callback:
print(f"************* Schritt (1) {i+1}: {text.lower()}\n")
streaming_callback(StreamingChunk(content=f"đź§ Schritt (1) {i+1}: {text.lower()}\n"))
final_answer = f"âś… Antwort von Echo1 auf '{text}': {text.upper()}"
if streaming_callback:
streaming_callback(StreamingChunk(content=final_answer + "\n"))
meta = [{
"index": 0,
"model": "test_callback",
"finish_reason": "stop"
# "finish_reason": "content_filter"
}]
output= {
"query": text,
"replies": [final_answer],
"meta": meta,
}
print("echo",output)
return {"output": text}
@component
class EchoB:
def __init__(self):
self.streaming_callback: Optional[Callable[[StreamingChunk], None]] = None
@component.output_types(output=dict)
def run(self, text:str, streaming_callback: Optional[Callable[[StreamingChunk], None]] = None):
print("IN echo2",text)
for i in range(3):
time.sleep(0.5)
print(f"++++++ Streamin-Callback von EchoB",streaming_callback)
if streaming_callback:
print(f"************* Schritt (2) {i+1}: {text.lower()}")
streaming_callback(StreamingChunk(content=f"đź§ Schritt (2) {i+1}: {text.lower()}\n"))
final_answer = f"âś… Antwort ECHO2 auf '{text}': {text.lower()}"
if streaming_callback:
streaming_callback(StreamingChunk(content=final_answer + "\n"))
meta = [{
"index": 0,
"model": "test_callback",
"finish_reason": "stop"
}]
output= {
"query": text,
"replies": [final_answer],
"meta": meta,
}
print("echo2",output)
return output
# đź§© PipelineWrapper fĂĽr Hayhooks/OpenWeb-UI
class PipelineWrapper(BasePipelineWrapper):
def setup(self) -> None:
pipeline_yaml = (Path(__file__).parent / "pipeline.yml").read_text()
callbacks = DeserializationCallbacks(component_pre_init=PipelineWrapper.component_pre_init_callback)
self.pipeline = Pipeline.loads(pipeline_yaml, callbacks=callbacks)
@staticmethod
def component_pre_init_callback(component_name: str, component_cls: Type, init_params: Dict[str, Any]):
custom_components = {
"echo1": EchoA,
"echo2": EchoB,
}
print ("SSSSSSSSSSSSSSSSSSSSSSSSSSSS component_pre_init_callback",EchoA)
if component_cls.__name__ in custom_components:
return custom_components[component_cls.__name__](**init_params)
return component_cls(**init_params)
def run_api(self, query: str) -> dict:
result = self.pipeline.run({"text": query})
return result["output"]
def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Union[str, Generator]:
question = get_last_user_message(messages)
stream = body.get("stream", True)
return streaming_generator(
pipeline=self.pipeline,
pipeline_run_args={
"echo1": {"text": question} ,
# "echo2": {"text": question}
}
)
#yml-file
components:
echo1:
type: pipeline_wrapper.EchoA
echo2:
type: pipeline_wrapper.EchoB
connections:
- sender: echo1.output
receiver: echo2
outputs:
- receiver: echo1.output
- receiver: echo2.output
`
Hi @hoschmieder! I think your use case makes sense, we are internally taking a look at it, so stay tuned!
cc @sjrl @tstadel
Hi Team, do you have any news regarding this request?
Hello all, for everyone who also needs the streaming from each componenten in the pipeline. Here is my patch for 9.1. Than you need to load in the wrapper:
from hayhooks import BasePipelineWrapper, get_last_user_message, streaming_generator_all as streaming_generator
--- __init__.py 2025-06-26 04:59:02.823443591 +0000
+++ __init__.py 2025-06-26 06:12:43.559461475 +0000
@@ -5,6 +5,7 @@
is_user_message,
get_last_user_message,
streaming_generator,
+ streaming_generator_all,
async_streaming_generator
)
@@ -14,6 +15,7 @@
"is_user_message",
"get_last_user_message",
"streaming_generator",
+ "streaming_generator_all",
"async_streaming_generator",
"create_app",
]
--- server/pipelines/utils.py 2025-06-26 04:59:02.830443476 +0000
+++ server/pipelines/utils.py 2025-06-26 06:12:17.736121433 +0000
@@ -175,3 +175,39 @@
except Exception as e:
log.warning(f"Error during pipeline task cleanup: {e}")
raise e
+
+
+
+# by HS: register streaming for all components
+def streaming_generator_all(pipeline: Pipeline, pipeline_run_args: Dict) -> Generator:
+ queue: Queue[str] = Queue()
+
+ def streaming_callback(chunk):
+ queue.put(chunk.content)
+
+ pipeline_run_args = pipeline_run_args.copy()
+
+ # NEU: Alle Komponenten mit streaming_callback erfassen
+ for name, component in pipeline.walk():
+ if hasattr(component, "streaming_callback"):
+ if name not in pipeline_run_args:
+ pipeline_run_args[name] = {}
+ pipeline_run_args[name]["streaming_callback"] = streaming_callback
+
+ def run_pipeline():
+ try:
+ pipeline.run(pipeline_run_args)
+ finally:
+ queue.put(None)
+
+ thread = threading.Thread(target=run_pipeline)
+ thread.start()
+
+ while True:
+ chunk = queue.get()
+ if chunk is None:
+ break
+ yield chunk
+
+ thread.join()
+
@hoschmieder Hi! Would you like to open a pull request? We will be happy to review it!
@mpangrazzi can this be closed now with your recent PR https://github.com/deepset-ai/hayhooks/pull/178 ?
@sjrl yep, we can close this! Thanks!
i have teset it, it works! Thank's a lot!
Mit freundlichen GrĂĽĂźen aus Pliezhausen
[cid:schmieder_indasys_logo_2dfc8044-cf7a-411a-ac66-31181b6f96b1.png]
Holger
Schmieder [cid:b65cc36d-2ae7-4898-becc-97c4ade45488.png]
Carl-Zeiss-Straße 5, 72124 Pliezhausen Geschäftsführer Geschäftsleitung
Telefon: +49 7127 988 110tel:+49%207127%20988%20110 E-Mail: @.@.> [cid:teams_chat_23d22538-8b61-4a8b-991a-8d90f5501e39.png] @.***>
www.schmieder.dehttps://www.schmieder.de/ | Schmieder Newsroomhttps://www.schmieder.de/schmieder-newsroom/ | Funkspruch Newsletterhttps://www.schmieder.de/funkspruch/ | Ticket @.***> | Schmieder TeamViewerhttps://www.schmieder.de/tv
Geschäftsführer: Holger Schmieder, Victor Ferreira ? Handelsregister: HRB 353858 Amtsgericht Stuttgart ? USt-IdNr. DE194533175 Diese Email enthält vertrauliche und/oder rechtlich geschützte Informationen. Sollten Sie nicht der richtige Adressat sein oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender (07127 988-0 oder @.***) und vernichten diese Email. Die unbefugte Weiterleitung, die Nutzung der Inhalte und das unerlaubte Kopieren dieser E-Mail sind untersagt. Die Kommunikation per E-Mail ist nicht gegen den Zugriff durch Dritte geschützt. Daher ist nur die von uns unterzeichnete, schriftliche Fassung von Aussagen verbindlich. Wir haften ausdrücklich nicht für den Inhalt und die Vollständigkeit von E-Mails und den gegebenenfalls daraus entstehenden Schäden. Sollte trotz unserer Virenschutzprogramme durch diese E-Mail ein Virus in Ihr System gelangen, so haften wir, soweit gesetzlich zulässig, nicht für die hieraus entstehenden Schäden.
FĂĽr Ihre Sicherheit und die Nachvollziehbarkeit des Mailversands: diese E-Mail wurde mit der ID @.>" vom Postfach @." zum Zeitpunkt "2025/10/24 13:27" versandt an @.***"
Von: Michele Pangrazzi @.> Gesendet: Freitag, 24. Oktober 2025 10:40 An: deepset-ai/hayhooks @.> Cc: Holger Schmieder @.>; Mention @.> Betreff: Re: [deepset-ai/hayhooks] Streaming Callback within a pipeline (Issue #113)
[https://avatars.githubusercontent.com/u/1433753?s=20&v=4]mpangrazzi left a comment (deepset-ai/hayhooks#113)https://github.com/deepset-ai/hayhooks/issues/113#issuecomment-3441853911
@sjrlhttps://github.com/sjrl yep, we can close this! Thanks!
— Reply to this email directly, view it on GitHubhttps://github.com/deepset-ai/hayhooks/issues/113#issuecomment-3441853911, or unsubscribehttps://github.com/notifications/unsubscribe-auth/ADH3FNTYYG64TNJHNOGOZYL3ZHQXNAVCNFSM6AAAAAB4P4NORWVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZTINBRHA2TGOJRGE. You are receiving this because you were mentioned.Message ID: @.***>