Pipeline breakpoints
With the new Pipeline run logic, we discussed that introducing breakpoints to pipelines becomes possible. Users could then set breakpoints, execute a pipeline from a breakpoint, execute up until a breakpoint. Save a checkpoint/snapshot. Set a callback on the breakpoint. Breakpoints will be beneficial for pipeline debugging. They can also be useful for pipeline evaluation, in particular for isolated component evaluation to find bottlenecks as described in Haystack 1.x docs. https://docs.haystack.deepset.ai/v1.24/docs/evaluation#integrated-and-isolated-node-evaluation
In integrated evaluation, a node receives the predictions from the preceding node as input. It shows the performance users can expect when running the pipeline and it's the default mode when calling pipeline.eval(). In isolated evaluation, a node is isolated from the predictions of the preceding node. Instead, it receives ground-truth labels as input. Isolated evaluation shows the maximum performance of a node if it receives the perfect input from the preceding node. You can activate it by running pipeline.eval(add_isolated_node_eval=True). For example, in an ExtractiveQAPipeline comprised of a Retriever and a Reader, isolated evaluation would measure the upper bound of the Reader's performance, that is the performance of the Reader assuming that the Retriever passes on all relevant documents. If the isolated evaluation result of a Node differs significantly from its integrated evaluation result, you may need to improve the preceding node to get the best results from your pipeline. If the difference is small, it means that you should improve the Node that you evaluated to improve the pipeline's overall result quality.
Is there already a draft or any previously discussed ideas or proposals/suggestions on how this would be implemented?
No, there is nothing like that already.
I've tried to define some more formal requirements and draft a first proposal on how to implement them.
https://www.notion.so/deepsetai/Implementing-Breakpoints-in-Haystack-Pipelines-1b3e210b37c4805c8ab1c03bfcdf6c3f
Here are the main use cases we want to cover
Debugging
Simple Example (Debug RAG pipeline):
- user invokes pipeline with breakpoint at Retriever of RAG pipeline
- pipeline runs till breakpoint and emits debug_state
- user investigates and manipulates debug_state
- user invokes pipeline with debug_state without breakpoints
- pipeline runs from after the Retriever and finishes emitting normal pipeline output
Sophisticated Example (Debug Agentic Pipeline):
- user invokes pipeline with breakpoint at Generator of Agentic pipeline
- pipeline runs till breakpoint first hits and emits debug state
- user investigates and manipulates debug_state
- user invokes pipeline with debug_state and same breakpoint as before
- pipeline runs next iteration until Generator and emits debug_state
- user wants to step to next component and runs pipeline with breakpoint at ConditionalRouter with debug_state
- pipeline runs ConditionalRouter and emits debug_state
- user wants to step to next component and runs pipeline with breakpoint at ToolInvoker with debug_state
- pipeline runs ToolInvoker and emits debug_state
- user investigates ToolInvoker output and is not happy with it, manipulates the debug_state
- user runs pipeline with manipulated debug_state till end (without breakpoints)
Evaluation
Simple Example (Evaluate Retrieval of RAG pipeline):
- user invokes pipeline with breakpoint at Retriever
- pipeline runs till breakpoint and emits debug state
- user compares debug state with gold standard
Sophisticated Example (Isolated Evaluation of RAG pipeline Generator):
- user builds artificial debug_state simulating a perfect retriever
- user invokes pipeline with artificial debug_state
- pipeline runs till end
- user compares results with expected values (generator gold standard)
Features:
- defining breakpoints (components at which the pipeline.run execution should stop)
- emitting a complete pipeline.run state (called debug_state in the following) which allows to
- investigate current inputs and outputs
- investigate pipeline run queue
- manipulating the debug_state
- sending the debug_state over wire (serializability)
- resuming pipeline.run from debug_state
- maybe a flag to hit breakpoints before or after component execution makes sense
Assumptions:
- breakpoints and debug_state are passed with pipeline.run
- pipeline.run ends at breakpoints
- debugging works in a RESTful way: debug_state must be fully serializable and contain any state information of the pipeline. There is no hidden state somewhere else (except for components we cannot control: that's fine)
@davidsbatista @Amnah199 I've done a small braindump above that should illustrate the main use cases we want to cover. Happy to answer further questions.
Here is an older issue that might be related: https://github.com/deepset-ai/haystack/issues/8073
@tstadel thanks a lot for this!
@tstadel thanks a lot!
First PR is open in haystack-experimental https://github.com/deepset-ai/haystack-experimental/pull/269
Hey! Sorry to slide in this issue but I would have a question regarding its use case. Currently its mainly thought for development/debugging, but what about in a production environment? I was thinking of using it to snapshot pipelines before long component execution times i.e. if there is an error, reload the pipeline to the state before the execution, but with its current setup (only writes as a json file) its not applicable for prod. Also since they are hardcoded and predefined its not meant for fault tolerance implementations. Am I wrong to think that this could have been used for that, or its solely thought for debugging?
Hey @aymbot! Thanks for your feedback :) We are eager to get any feedback and use cases for the breakpoints!
I was thinking of using it to snapshot pipelines before long component execution times i.e. if there is an error, reload the pipeline to the state before the execution, but with its current setup (only writes as a json file) its not applicable for prod.
Can you elaborate a bit more on this? Why is it not applicable for production, and in your scenario, how would it work?
Also since they are hardcoded and predefined its not meant for fault tolerance implementations.
To what does 'they' refer?
No worries, with pleasure @davidsbatista It's not applicable in production because the output should also be serializable (i.e. passed as ByteStream). In our scenario we would like to use it to recover agentic pipelines, if one tool call/process is long and fails, to recover the pipeline state to as it was before. We just wish to use it beyond a debug state, and more for fault tolerance. The 'they' refers to the breakpoints. Hope its a bit clearer! It could also be that I am just expecting it to behave in a certain way that it wasn't intended for!
It's not applicable in production because the output should also be serializable (i.e. passed as ByteStream).
The breakpoints produce serialisable pipeline states, as JSON objects, which you can encode into bytes, e.g:
import json
json_object = {
"name": "Test",
"city": "Berlin"
}
json_string = json.dumps(json_object)
byte_stream = json_string.encode('utf-8')
print(byte_stream)
Those states, can then be injected into a pipeline, so that it resumes from that state:
decoded_string = byte_stream.decode('utf-8')
decoded_json = json.loads(decoded_string)
result = pipeline.run(data={}, resume_state=decoded_json)
This is how you could save/load states using bytestreams. The second part, detecting a failure after some time interval, and then to trigger a pipeline resume, I think you would need some watchdog/scheduler tool, e.g., Airflow - but I lack more context.
Oh! Thanks for the heads-up with the bytestreams, sorry for overlooking that! For the second part, it's not per-se about detecting the failure, but to be able to recover the pipeline from its prior state when it fails (i.e. a tool call gets a timeout). But in theory if I add breakpoints at every component, I could retrieve it from the last successful one.