[Feature] Thread for Async - Description of our production workflows
What feature would you like to see?
Hi @chenmoneygithub,
I am creating a feature request as you requested in Discord - in the DSPy async thread.
I haven't deeply explored the current dspy.asyncify implementation, so I can't request specific features, but I think I can provide you with our production workflows and how we use DSPy. All of these should be as performant as possible and with async support and stable.
Inference workflow
This is the simple, default use case. This represents about 70% of our production use cases, where we just "make an API call to our litellm server".
dspy.settings.configure(lm=lm, async_max_workers=8)
extractor = dspy.Predict(ExtractorSignature)
extractor = dspy.asyncify(extractor)
r = await extractor(passage="...")
"More advanced" inference workflow
This is a use case where we combine multiple Signatures in one module, which is also common in our production settings. I'm intentionally not using a multi-module approach (e.g., loading a trained module within a module). I'm trying to combine module calls in separate async function and I am asyncifying all modules. If it's possible to have one main module where I could load multiple modules from .json and use their forward functions, that would be awesome.
class Extractor(dspy.Module):
def __init__(self) -> None:
self.extract = dspy.Predict(ExtractorSignature)
self.find_items = dspy.ChainOfThought(FindSignature)
def forward(self, passage: str, find : bool = True) -> str:
response = self.extract(passage=passage)
if find:
response = self.find_items(response=response)
return response
dspy.settings.configure(lm=lm, async_max_workers=8)
extractor = Extractor()
extractor = dspy.asyncify(extractor)
r = await extractor(passage="...")
Batch workflow
The last use case we deal with is batch inference via Evaluate. Performance is okay, but a dedicated interface would be nice. We typically process about 5k - 10k dspy.Examples per batch. We mainly use programs from the "inference workflows" above, and we chain them to maintain at least some checkpoints. This is a approach I use also a lot in running experiments for my research mostly.
evaluate = dspy.Evaluate(
devset=batch, # list of dspy.Examples
metric=lambda x, y: True,
num_threads=20,
display_progress=False,
return_outputs=True,
)
_, outputs = evaluate(program)
answers = [dict(output[1]) for output in outputs]
df = pd.DataFrame(answers)
df
PS: This function is my #1 hated thing in DSPy that I can't live without. It's really unfortunate when Evaluate randomly freezes and stays in a loop for minutes without running a single example. This happens mainly at the last example of a batch. I'm not sure if this is related to your rewrite.
@chenmoneygithub I don't know if this is what you're looking for, but I would say this is a brief introduction to our production usage of DSPy. I know it's quite basic, but if you'd like me to answer anything else, I'm here to help.
Would you like to contribute?
- [x] Yes, I'd like to help implement this.
- [ ] No, I just want to request it.
Additional Context
No response
Hey @mikeedjones we're trying to collect requests for how to improve async support in DSPy, especially dealbreakers or important stuff. I think you're among our most sophisticated users. Do you happen to have any thoughts?
native async via the litellm acompletion would make the most sense[1]. How realistic is it to refactor dspy to use all native function?
Alternatively the refactor could maybe start as a parallel lib à la redis-py [2] -- then merge?
Whatever gets decided, I strongly suggest we avoid a huge monolitic update, and more something we can move to in small steps.
In any case, since I presume most of us use DSPy with remote LMs, async should really be the de-facto way of using the library.
[1] https://docs.litellm.ai/docs/completion/stream#async-completion [2] https://redis-py.readthedocs.io/en/stable/examples/asyncio_examples.html
In any case, since I presume most of us use DSPy with remote LMs, async should really be the de-facto way of using the library.
Async matters when users want a high level of concurrency, e.g., the deployed DSPy program's endpoint is handling >1000 requests simultaneously. For exploring, prototyping and even decent-size deployment, multithreading + sync calls is sufficient. The biggest downside of making everything async is whenever users want to invoke DSPy code, they need to create their own event loop, which is error prone and creates a big overhead. So here is what we will do:
- Sync by default.
- Async will be supported via something like
acall(), which will exist indspy.Module,dspy.Predictanddspy.LM.
It's a relatively big change, and we need multiple stages towards it, please stay tuned!
From Discord: no-dice was asking:
If I asyncify a custom DSPy moduel which contains 3 ChainOfThought modules in its forward function - will those sub-CoT modules also be asyncified? It looks like currently the answer is no but am not certain
But it turned out that he needed .batch(). Just adding this here since we need good docs for parallelization.