dspy
dspy copied to clipboard
Streaming
dspy.streamify
can be used to convert the dspy program to a streaming mode. This is useful when you want to stream
the intermediate outputs (i.e. O1-style reasoning) to the client before the final prediction is ready. This uses
asyncify under the hood and inherits the execution semantics.
The deltas of every module in the program are streamed directly with no processing and then once the final prediction is ready it is yielded.
Here's how it works for deployment
from fastapi.responses import StreamingResponse
streaming_dspy_program = dspy.streamify(dspy.ChainOfThought("question -> answer"))
@app.post("/predict/stream")
async def stream(question: Question):
async def generate():
async for value in streaming_dspy_program(question=question.text):
if isinstance(value, dspy.Prediction):
data = {"prediction": value.labels().toDict()}
elif isinstance(value, litellm.ModelResponse):
data = {"chunk": value.json()}
yield f"data: {ujson.dumps(data)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
# Since you're often going to want to stream the result of a DSPy program as server-sent events,
# we've included a helper function for that, which is equivalent to the code above.
from dspy.utils.streaming import streaming_response
@app.post("/predict/stream")
async def stream(question: Question):
stream = streaming_dspy_program(question=question.text)
return StreamingResponse(streaming_response(stream), media_type="text/event-stream")
Changes
- New in-memory LMRequestLRUCache with a default max size of 10_000_000.
Notes
- No intermediate details are streamed with a cache hit on the in-memory LRU cache because we have the final result instantly. Streaming should work with in-memory cache turned off, which enables the LiteLLM cache