guidance
guidance copied to clipboard
[Draft] Async-based parallelism
Behavior changes:
- lazy execution (
lm += foo()always returns immediately) - execution triggered by stateful access, e.g.
str(lm),lm[key], etc.
Introduces:
- async stateful accessors, e.g.
await lm.get_async(key)(API is still a WIP) - async guidance functions (i.e.
@guidancedecorator onasync deffunctions- allows usage of async accessors inside of guidance functions
- as well as other async apis, semaphores, etc.
Note: async accessors are fully compatible with non-async guidance functions (even stateful ones). I.e. you don't have to rewrite your existing guidance functions as async to get the concurrency benefits of async accessors farther up the stack.
Here's an example usage:
- The main logic is encapsulated in a normal (non-async)
@guidancefunctionextract_image_data-- it does not need to be aware that its callers may be async! - An async
@guidancefunctionget_and_describe_imagethat uses external async functions, namely thegetmethod of anhttpx.AsyncClient.- Note that while async accessors are perfectly valid, non-async accessors on the
Modelobject (lm) are disallowed inside of async@guidancefunctions and will raise an exception. We could probably "fix" this, but it's honestly kind of a nice safeguard against shooting ourselves in the foot.
- Note that while async accessors are perfectly valid, non-async accessors on the
- An async
mainfunction that gathers some number of coroutines returned by an async accessor on each of our unevaluated guidance programs.
import httpx
import asyncio
from guidance import *
@guidance
def extract_image_data(lm, image_bytes):
with user():
lm += "What is in this image?"
lm += image(image_bytes)
with assistant():
lm += json(
schema = {
"type": "object",
"properties": {
"description": {"type": "string"},
"colors": {"type": "array", "items": {"type": "string"}},
"objects": {"type": "array", "items": {"type": "string"}},
},
"required": ["description", "colors", "objects"],
"additionalProperties": False
},
name = "data",
)
return lm
@guidance
async def get_and_describe_image(lm, client):
resp = await client.get("https://picsum.photos/200")
resp.raise_for_status()
image_bytes = resp.content
lm += extract_image_data(image_bytes)
return lm
async def main():
lm = models.OpenAI("gpt-4o-mini", echo=False)
async with httpx.AsyncClient(follow_redirects=True) as client:
lms = [
lm + get_and_describe_image(client)
for _ in range(10)
]
datas = await asyncio.gather(*[lm.get_async("data") for lm in lms])
return datas
@guidance functions can also be naively parallelized (regardless of whether or not they are async) via the batched entrypoints:
lms = lm.run_batched([func_1(), ... func_n()])
lms = await lm.async_run_batched([func_1(), ... func_n()])
Note that these entrypoints actually run the functions and are not lazy like += is.
TODOs:
- [x] fix stateful capture blocks
- [x] put
token_counton state - [x] how to trigger streams?
- [x] add example usage to this PR
- [x] fix and un-comment calls to vis/renderer
- [ ] stabilize async accessor api
- [ ] make a decision about the "ambiguous forking" problem
- [ ] do some profiling experiments to ensure we're not introducing unnecessary overhead (e.g. compare to manual thread-based parallelism)
- [ ] documentation
Lazy execution has some benefits:
- Enables async guidance usage, as
lm += ...is non-blocking, and we can choose to cooperatively yield control to the event loop when blocking via async accessors likeawait lm.get_async(...) - Allows multiple
lm += ...to be buffered, reducing the number of round-trip calls (potentially saving a lot of tokens...) to the relevant model api
That being said, it has some pretty big drawbacks:
- Not intuitively clear that
str(lm)orlm[key]and their async counterparts are responsible for execution. In the above example, thelm.get_async("data")coroutines must be gathered and awaited inside thewith ... as client:block, or else the client will be closed before any network I/O is attempted
async with httpx.AsyncClient(follow_redirects=True) as client:
lms = [
lm + get_and_describe_image(client)
for _ in range(10)
]
datas = await asyncio.gather(*[lm.get_async("data") for lm in lms])
- Model objects have been (until now) treated as immutable (
lm += ...is not in-place). The accessors (str(lm)orlm[key]and their async counterparts) now mutate the lm by running all "pending" guidance functions/AST nodes in-place. This is somewhat confusing. - Lazy eval leads to ambiguity when implicitly "forking" with code like
temp_lm = lm + ...-- what iflmhad some pending state on it?- There may be a simple solution here, e.g. make
+=actually mutate in-place and have+be semantically different by blocking to let pending state "catch up" and then returning a copy with new pending state on it. (This may be a "partial" implementation of the "async dispatch" idea below).
- There may be a simple solution here, e.g. make
There might be a solution here that replaces lazy eval with non-blocking eager eval, akin to jax's async dispatch, where we could introduce a block_until_ready method (or just its async counterpart). But I am hesitant, mainly due to lazy execution's benefit no. 2 above.
@Harsha-Nori @nking-1 @JC1DA @nopdive @paulbkoch @riedgar-ms would love any and all of your inputs here.
- If we can collectively live with the downsides of lazy eval, let's come to a consensus on the right API for the async accessors.
- If we can't, I can try to pivot to the async dispatch approach?
Wouldn't the line
lm = models.OpenAI("gpt-4o-mini", echo=False)
need something to indicate that underneath it needs to create an AsyncOpenAI class, and not an OpenAI one? Or are we assuming that the whole task runs its queued evaluations synchronously? I think we want to former, but that's going to mean parallel sets of Model and Interpreter classes, I think?
Wouldn't the line
lm = models.OpenAI("gpt-4o-mini", echo=False)need something to indicate that underneath it needs to create an
AsyncOpenAIclass, and not anOpenAIone? Or are we assuming that the whole task runs its queued evaluations synchronously? I think we want to former, but that's going to mean parallel sets ofModelandInterpreterclasses, I think?
In the interest of avoiding having parallel sets of these classes (or rather, parallel implementations of almost the entire stack...), Paul and I were thinking to just have the ONE async implementation, with all of the sync accessor methods just wrapping the "one true async implementation".
This leads to some complications around re-entrancy (e.g. what happens if you add a non-async guidance function to your lm while inside of an async guidance function?), but I feel pretty satisfied that I came to an ergonomic solution that lets these things be arbitrarily mixed and matched.
So the synchronous versions just do a Task.run() (or whatever it is)? Presumably that spins up a short-lived event loop.... I'm guessing we're not concerned about performance implications on that?
So the synchronous versions just do a
Task.run()(or whatever it is)? Presumably that spins up a short-lived event loop.... I'm guessing we're not concerned about performance implications on that?
We're maintaining a single long-lived event loop in a daemon thread (which has its own implications I suppose), so we just submit the coroutine and block the main thread until it's ready.
The nice thing is that this is only happening at the very top-level entry point, so we don't need multiple threads or anything like that to support recursive calls. Getting that working without deadlocks was an an interesting exercise -- more than happy to look at that code together!
:warning: Please install the to ensure uploads and comments are reliably processed by Codecov.
Codecov Report
Attention: Patch coverage is 68.28087% with 131 lines in your changes missing coverage. Please review.
Project coverage is 55.73%. Comparing base (
3918b36) to head (9202113). Report is 1 commits behind head on main.
:exclamation: Your organization needs to install the Codecov GitHub app to enable full functionality.
Additional details and impacted files
@@ Coverage Diff @@
## main #1183 +/- ##
===========================================
+ Coverage 40.63% 55.73% +15.10%
===========================================
Files 62 63 +1
Lines 4782 4972 +190
===========================================
+ Hits 1943 2771 +828
+ Misses 2839 2201 -638
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
:rocket: New features to boost your workflow:
- :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
- :package: JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.
Thanks for the push here @hudson-ai -- fantastic work, really.
I've always been a huge fan of Jax's async dispatch model, and want to better understand why benefit 2 will no longer apply in an async dispatch world. Can't we keep e.g. a debounce buffer that batches objects as much as we can, thereby getting most of the benefit anyway?
There might be a solution here that replaces lazy eval with non-blocking eager eval, akin to jax's async dispatch, where we could introduce a block_until_ready method (or just its async counterpart). But I am hesitant, mainly due to lazy execution's benefit no. 2 above.
Thanks for the push here @hudson-ai -- fantastic work, really.
I've always been a huge fan of Jax's async dispatch model, and want to better understand why benefit 2 will no longer apply in an async dispatch world. Can't we keep e.g. a debounce buffer that batches objects as much as we can, thereby getting most of the benefit anyway?
There might be a solution here that replaces lazy eval with non-blocking eager eval, akin to jax's async dispatch, where we could introduce a block_until_ready method (or just its async counterpart). But I am hesitant, mainly due to lazy execution's benefit no. 2 above.
Thanks @Harsha-Nori! And I appreciate the input / question. I don't honestly know the answer -- maybe some kind of buffering would work. Just going to think out loud a bit...
Let's say we have a chain of lm objects:
lm_1 = lm + foo(name="foo")
lm_2 = lm_1 + bar(name="bar")
lm_3 = lm_2 + baz(name="baz")
With lazy execution as it's implemented in this PR, nothing gets executed until we do something like lm_3["bar"], at which point, we run the chain foo(...) + bar(...) + baz(...). If we try to access an earlier one, e.g. lm_2["bar"], we have to run the chain foo(... ) + bar(...), and we may get a different answer.
I'm imagining that if we did async dispatch + eager execution (no buffering), each of lm_1, lm_2, and lm_3 would essentially have a Future under the hood, with the bar part of lm_2 being unable to execute until the foo part of lm_1 does, etc.
With debounce-style buffering, we could track parent-child relationships, and noting that lm_1 and lm_2 both have children, we wouldn't run anything for them at all, only computing lm_3's foo(... ) + bar(...) + baz(...). But we'd then have to somehow back-fill lm_1 and lm_2, e.g. in case someone tries to access lm_1["foo"].
This doesn't seem too bad, but I think the story gets far more complicated once we start having branching calls / arbitrary DAGs.
E.g.
for _ in range(100):
lm += qux()
lm_1 = lm
for _ in range(100):
lm_1 += foo()
lm2 = lm
for _ in range(100):
lm_2 += bar()
lm_1 and lm_2 share a common ancestor, namely lm with its 100 quxes. What if both of them start trying to compute their chains (qux() + ... + qux() + foo() + ... + foo() and qux() + ... + qux() + bar() + ... + bar(), respectively? Do they have to compete to acquire a lock on lm to make sure only one value gets computed for qux() + ... + qux()? If so, that means we can't parallelize the foos and the bars. For non-trivial DAGs, this means we probably miss a ton of speedup opportunities for things that should be embarassingly parallel.
If we can figure out the right way to do this "back-filling", I kind of like the idea. But it's also a bit spooky... Thoughts?
Some kind of lm.run() is a lot less magic and in a lot of ways, a lot more cumbersome (e.g. having to call run before every getitem, lest an exception...). But it's another approach to remove ambiguities and keep everything immutable.
@nopdive I know you're a fan of async dispatch. Any thoughs on your end?
Notes / status update for anyone watching this --
- Everything works, but the sticky points are still surrounding API
- I'm currently working on "backfilling" discussed above in order to get rid of the ambiguity that comes with "forking". @nopdive and I outlined a version of that together that I think has acceptable ergonomics.
- I'm leaning towards eliminating the
async_getfunction and its siblings in favor of something that feels more like async dispatch, i.e.await lm.block_until_ready()or something of the sort. But deciding this can wait until the backfilling stuff is done