dspy
dspy copied to clipboard
Adding stream to DSPy LMs
A few members of the community have been asking for support for streaming LM output in DSPy.
@sutyum and @detaos have discussed this extensively before.
One of the challenges is that it's not even clear how to stream a DSPy program (it makes many calls to the LM, in arbitrary order, decided by the user). It's not really possible to stream that in general case.
However, we can add support per LM calls, e.g. in dspy.Predict
.
The key thing is that Predict
actually outputs multiple fields so the support for streaming will need to return an object (Prediction
) or StreamedPrediction
(maybe?) and each field in there (e.g., rationale
and answer
) will need to stream separately.
I don't have any insight into what that would take but it sounds simple overall.
I'll think about this. Support per LM call would be sufficient (and what we're doing anyway).
For reference, here's from the langchhain-dspy notebook:
If we convert this into a fuller integration, all users stand to benefit. LangChain users will gain the ability to optimize any chain with any DSPy optimizer. DSPy users will gain the ability to export any DSPy program into an LCEL that supports streaming and tracing, and other rich production-targeted features in LangChain.
But that sounds harder.
This sounds super cool, similar to #249 I think the broader question is "how does DSPy fit into the productionization workflow" and something we can think more about to come up with an elegant approach.
Three different aspects of dspy streaming:
- For debugging / understanding: Being able to see the LLM chatter streaming gives insight into what is actually going on, even though the chatter may be completely incoherent if there are multiple concurrent streams, structured output etc, it can help catch issues early without having to wait for the whole program to finish.
- For lower latency UI: Being able to subscribe to changes to the final output of the program, be it a stream of text or a partial object
- For progress visualization: Being able to subscribe to state changes in the execution of the program, so that it is possible to understand how far the execution has progressed so far
Any news on this? I really like dspy, but without streaming it's hard to use in production
My take on this is to implement the following two parts:
- For the
LM
class, expose a streaming counterpart of thebasic_request()
abstract method. I suggest that it should be up to the LMs to implement something likebasic_streaming_request()
, which should be similar to OpenAI's streaming API that takes a request and returns a response generator. - Implement a streaming counterpart to
dsp.adapters.Template.extract()
so that it parses the streaming response returned byLM.basic_streaming_request()
on the fly and returns the response generators corresponding to each output field. The user can then retrieve the streaming response for a field by reading its corresponding generator. However, I think implementing streaming parsing would be the difficult part.
Right now I am working on a temporary workaround as I only need streaming for the final response synthesis module. As my project is a RAG pipeline, I only need the last part that responds to the user to be streaming. Thus, I will try to export the compiled template as a string, then make the call to the LM and parse the streaming response myself.
@theta-lin if you find how to compile the string with dspy without having to check the history (having made a call) or a wrapper around that (doing a class that sends null response, and then you check history), do tell.
@pedroallenrevez I do have a solution here, it is adapted from https://github.com/stanfordnlp/dspy/blob/55510eec1b83fa77f368e191a363c150df8c5b02/dspy/predict/llamaindex.py#L22
def get_template(predict_module: dspy.Predict, **kwargs) -> str:
"""Get formatted template from predict module."""
# (I suddenly realized what these comment mean now... they are copied from `old_generate()`)
# Extract the three privileged keyword arguments.
signature = ensure_signature(predict_module.signature)
# Switch to legacy format for dsp.generate
template = signature_to_template(signature)
# The only difference from the original code, to make it work with uncompiled predictors
if hasattr(predict_module, "demos"):
demos = predict_module.demos
else:
demos = []
# All of the other kwargs are presumed to fit a prefix of the signature.
# That is, they are input variables for the bottom most generation, so
# we place them inside the input - x - together with the demos.
x = dsp.Example(demos=demos, **kwargs)
return template(x)
As it happens that I indeed finished a workaround, here is an example of both extracting the prompt template string and actually implementing the streaming response:
class Example(dspy.Module):
self.synthesizer = dspy.Predict(...)
self.llm = ...
def forward(self, query):
synthesizer_template = get_template(
# Suppose that we have a predict module called `synthesizer` that takes in an InputField
# called `query` and outputs an OutputField called `response`
self.synthesizer,
# Just pass in all the input fields as kwargs
query=query,
)
def parse_gen():
"""
A generator that returns the part after "Response:" and strips whitespace.
In other words, it's like `dsp.adapters.Template.extract()` but only for one field.
The assumption is that you ONLY want to extract the `response` field and
that the "response" field is the LAST field of the output. I only implemented
this because I am using CoT for my actual code so that there is a `rationale`
field proceeding the `response` field.
"""
# Most of these are just for stripping the whitespace at the beginning and
# the end, but preserving those in the middle.
def rstripped(s):
from itertools import takewhile
"""Extract the trailing whitespace itself."""
return "".join(reversed(tuple(takewhile(str.isspace, reversed(s)))))
field = "Response:"
# Suppose that you have an `llm` class that returns a response generator for
# its `stream_complete()` method
gen = self.llm.stream_complete(synthesizer_template)
before_response = ""
for r in gen:
# r.delta is the amount of "delta" LLM output, or, the new tokens
before_response += r.delta
offset = before_response.find(field)
if offset != -1:
s = before_response[offset + len(field) :]
if s.strip():
yield s.strip()
prev_whitespace = rstripped(s)
break
for r in gen:
s = r.delta
yield prev_whitespace + s.rstrip()
prev_whitespace = rstripped(s)
# The `response` is a generator for the actual streaming response
return dspy.Prediction(response=parse_gen())
@pedroallenrevez my LinkedIn DMs are open, let's chat
I agree with @pedroallenrevez. Currently my whole pipeline(~3 components) is in DSPy, except the last step(final answer generation). I don't care about streaming in between, I just want the final answer to be streamed. I currently mock the prompt and send it to LLM via a vllm client and stream the answer.
would be nice if there was some streaming support for cases where there are only one output field.