guidance icon indicating copy to clipboard operation
guidance copied to clipboard

[Draft] Async-based parallelism

Open hudson-ai opened this issue 7 months ago • 10 comments
trafficstars

Behavior changes:

  • lazy execution (lm += foo() always returns immediately)
  • execution triggered by stateful access, e.g. str(lm), lm[key], etc.

Introduces:

  • async stateful accessors, e.g. await lm.get_async(key) (API is still a WIP)
  • async guidance functions (i.e. @guidance decorator on async def functions
    • allows usage of async accessors inside of guidance functions
    • as well as other async apis, semaphores, etc.

Note: async accessors are fully compatible with non-async guidance functions (even stateful ones). I.e. you don't have to rewrite your existing guidance functions as async to get the concurrency benefits of async accessors farther up the stack.

Here's an example usage:

  1. The main logic is encapsulated in a normal (non-async) @guidance function extract_image_data -- it does not need to be aware that its callers may be async!
  2. An async @guidance function get_and_describe_image that uses external async functions, namely the get method of an httpx.AsyncClient.
    • Note that while async accessors are perfectly valid, non-async accessors on the Model object (lm) are disallowed inside of async @guidance functions and will raise an exception. We could probably "fix" this, but it's honestly kind of a nice safeguard against shooting ourselves in the foot.
  3. An async main function that gathers some number of coroutines returned by an async accessor on each of our unevaluated guidance programs.
import httpx
import asyncio
from guidance import *

@guidance
def extract_image_data(lm, image_bytes):
    with user():
        lm += "What is in this image?"
        lm += image(image_bytes)
    with assistant():
        lm += json(
            schema = {
                "type": "object",
                "properties": {
                    "description": {"type": "string"},
                    "colors": {"type": "array", "items": {"type": "string"}},
                    "objects": {"type": "array", "items": {"type": "string"}},
                },
                "required": ["description", "colors", "objects"],
                "additionalProperties": False
            },
            name = "data",
        )
    return lm

@guidance
async def get_and_describe_image(lm, client):
    resp = await client.get("https://picsum.photos/200")
    resp.raise_for_status()
    image_bytes = resp.content
    lm += extract_image_data(image_bytes)
    return lm

async def main():
    lm = models.OpenAI("gpt-4o-mini", echo=False)
    async with httpx.AsyncClient(follow_redirects=True) as client:
        lms = [
            lm + get_and_describe_image(client)
            for _ in range(10)
        ]
        datas = await asyncio.gather(*[lm.get_async("data") for lm in lms])
    return datas

@guidance functions can also be naively parallelized (regardless of whether or not they are async) via the batched entrypoints:

lms = lm.run_batched([func_1(), ... func_n()])
lms = await lm.async_run_batched([func_1(), ... func_n()])

Note that these entrypoints actually run the functions and are not lazy like += is.

TODOs:

  • [x] fix stateful capture blocks
  • [x] put token_count on state
  • [x] how to trigger streams?
  • [x] add example usage to this PR
  • [x] fix and un-comment calls to vis/renderer
  • [ ] stabilize async accessor api
  • [ ] make a decision about the "ambiguous forking" problem
  • [ ] do some profiling experiments to ensure we're not introducing unnecessary overhead (e.g. compare to manual thread-based parallelism)
  • [ ] documentation

hudson-ai avatar Apr 22 '25 16:04 hudson-ai

Lazy execution has some benefits:

  1. Enables async guidance usage, as lm += ... is non-blocking, and we can choose to cooperatively yield control to the event loop when blocking via async accessors like await lm.get_async(...)
  2. Allows multiple lm += ... to be buffered, reducing the number of round-trip calls (potentially saving a lot of tokens...) to the relevant model api

That being said, it has some pretty big drawbacks:

  1. Not intuitively clear that str(lm) or lm[key] and their async counterparts are responsible for execution. In the above example, the lm.get_async("data") coroutines must be gathered and awaited inside the with ... as client: block, or else the client will be closed before any network I/O is attempted
    async with httpx.AsyncClient(follow_redirects=True) as client:
        lms = [
            lm + get_and_describe_image(client)
            for _ in range(10)
        ]
        datas = await asyncio.gather(*[lm.get_async("data") for lm in lms])
  1. Model objects have been (until now) treated as immutable (lm += ... is not in-place). The accessors (str(lm) or lm[key] and their async counterparts) now mutate the lm by running all "pending" guidance functions/AST nodes in-place. This is somewhat confusing.
  2. Lazy eval leads to ambiguity when implicitly "forking" with code like temp_lm = lm + ... -- what if lm had some pending state on it?
    • There may be a simple solution here, e.g. make += actually mutate in-place and have + be semantically different by blocking to let pending state "catch up" and then returning a copy with new pending state on it. (This may be a "partial" implementation of the "async dispatch" idea below).

There might be a solution here that replaces lazy eval with non-blocking eager eval, akin to jax's async dispatch, where we could introduce a block_until_ready method (or just its async counterpart). But I am hesitant, mainly due to lazy execution's benefit no. 2 above.

@Harsha-Nori @nking-1 @JC1DA @nopdive @paulbkoch @riedgar-ms would love any and all of your inputs here.

  • If we can collectively live with the downsides of lazy eval, let's come to a consensus on the right API for the async accessors.
  • If we can't, I can try to pivot to the async dispatch approach?

hudson-ai avatar Apr 22 '25 23:04 hudson-ai

Wouldn't the line

lm = models.OpenAI("gpt-4o-mini", echo=False)

need something to indicate that underneath it needs to create an AsyncOpenAI class, and not an OpenAI one? Or are we assuming that the whole task runs its queued evaluations synchronously? I think we want to former, but that's going to mean parallel sets of Model and Interpreter classes, I think?

riedgar-ms avatar Apr 23 '25 17:04 riedgar-ms

Wouldn't the line

lm = models.OpenAI("gpt-4o-mini", echo=False)

need something to indicate that underneath it needs to create an AsyncOpenAI class, and not an OpenAI one? Or are we assuming that the whole task runs its queued evaluations synchronously? I think we want to former, but that's going to mean parallel sets of Model and Interpreter classes, I think?

In the interest of avoiding having parallel sets of these classes (or rather, parallel implementations of almost the entire stack...), Paul and I were thinking to just have the ONE async implementation, with all of the sync accessor methods just wrapping the "one true async implementation".

This leads to some complications around re-entrancy (e.g. what happens if you add a non-async guidance function to your lm while inside of an async guidance function?), but I feel pretty satisfied that I came to an ergonomic solution that lets these things be arbitrarily mixed and matched.

hudson-ai avatar Apr 23 '25 18:04 hudson-ai

So the synchronous versions just do a Task.run() (or whatever it is)? Presumably that spins up a short-lived event loop.... I'm guessing we're not concerned about performance implications on that?

riedgar-ms avatar Apr 23 '25 18:04 riedgar-ms

So the synchronous versions just do a Task.run() (or whatever it is)? Presumably that spins up a short-lived event loop.... I'm guessing we're not concerned about performance implications on that?

We're maintaining a single long-lived event loop in a daemon thread (which has its own implications I suppose), so we just submit the coroutine and block the main thread until it's ready.

The nice thing is that this is only happening at the very top-level entry point, so we don't need multiple threads or anything like that to support recursive calls. Getting that working without deadlocks was an an interesting exercise -- more than happy to look at that code together!

hudson-ai avatar Apr 23 '25 18:04 hudson-ai

:warning: Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

Attention: Patch coverage is 68.28087% with 131 lines in your changes missing coverage. Please review.

Project coverage is 55.73%. Comparing base (3918b36) to head (9202113). Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
guidance/_ast.py 68.37% 37 Missing :warning:
guidance/models/_base/_model.py 77.86% 27 Missing :warning:
guidance/models/experimental/_vllm.py 28.57% 20 Missing :warning:
guidance/_reentrant_async.py 68.29% 13 Missing :warning:
guidance/models/_openai_base.py 58.33% 10 Missing :warning:
guidance/models/_azureai.py 0.00% 9 Missing :warning:
guidance/models/_base/_interpreter.py 88.88% 4 Missing :warning:
guidance/models/_base/_state.py 50.00% 4 Missing :warning:
guidance/_guidance.py 50.00% 3 Missing :warning:
guidance/library/_gen.py 0.00% 2 Missing :warning:
... and 1 more

:exclamation: Your organization needs to install the Codecov GitHub app to enable full functionality.

Additional details and impacted files
@@             Coverage Diff             @@
##             main    #1183       +/-   ##
===========================================
+ Coverage   40.63%   55.73%   +15.10%     
===========================================
  Files          62       63        +1     
  Lines        4782     4972      +190     
===========================================
+ Hits         1943     2771      +828     
+ Misses       2839     2201      -638     

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

:rocket: New features to boost your workflow:
  • :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • :package: JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

codecov-commenter avatar Apr 24 '25 19:04 codecov-commenter

Thanks for the push here @hudson-ai -- fantastic work, really.

I've always been a huge fan of Jax's async dispatch model, and want to better understand why benefit 2 will no longer apply in an async dispatch world. Can't we keep e.g. a debounce buffer that batches objects as much as we can, thereby getting most of the benefit anyway?

There might be a solution here that replaces lazy eval with non-blocking eager eval, akin to jax's async dispatch, where we could introduce a block_until_ready method (or just its async counterpart). But I am hesitant, mainly due to lazy execution's benefit no. 2 above.

Harsha-Nori avatar Apr 25 '25 16:04 Harsha-Nori

Thanks for the push here @hudson-ai -- fantastic work, really.

I've always been a huge fan of Jax's async dispatch model, and want to better understand why benefit 2 will no longer apply in an async dispatch world. Can't we keep e.g. a debounce buffer that batches objects as much as we can, thereby getting most of the benefit anyway?

There might be a solution here that replaces lazy eval with non-blocking eager eval, akin to jax's async dispatch, where we could introduce a block_until_ready method (or just its async counterpart). But I am hesitant, mainly due to lazy execution's benefit no. 2 above.

Thanks @Harsha-Nori! And I appreciate the input / question. I don't honestly know the answer -- maybe some kind of buffering would work. Just going to think out loud a bit...

Let's say we have a chain of lm objects:

lm_1 = lm + foo(name="foo")
lm_2 = lm_1 + bar(name="bar") 
lm_3 = lm_2 + baz(name="baz")

With lazy execution as it's implemented in this PR, nothing gets executed until we do something like lm_3["bar"], at which point, we run the chain foo(...) + bar(...) + baz(...). If we try to access an earlier one, e.g. lm_2["bar"], we have to run the chain foo(... ) + bar(...), and we may get a different answer.

I'm imagining that if we did async dispatch + eager execution (no buffering), each of lm_1, lm_2, and lm_3 would essentially have a Future under the hood, with the bar part of lm_2 being unable to execute until the foo part of lm_1 does, etc.

With debounce-style buffering, we could track parent-child relationships, and noting that lm_1 and lm_2 both have children, we wouldn't run anything for them at all, only computing lm_3's foo(... ) + bar(...) + baz(...). But we'd then have to somehow back-fill lm_1 and lm_2, e.g. in case someone tries to access lm_1["foo"].

This doesn't seem too bad, but I think the story gets far more complicated once we start having branching calls / arbitrary DAGs.

E.g.

for _ in range(100):
   lm += qux()

lm_1 = lm
for _ in range(100):
   lm_1 += foo()

lm2 = lm
for _ in range(100):
  lm_2 += bar()

lm_1 and lm_2 share a common ancestor, namely lm with its 100 quxes. What if both of them start trying to compute their chains (qux() + ... + qux() + foo() + ... + foo() and qux() + ... + qux() + bar() + ... + bar(), respectively? Do they have to compete to acquire a lock on lm to make sure only one value gets computed for qux() + ... + qux()? If so, that means we can't parallelize the foos and the bars. For non-trivial DAGs, this means we probably miss a ton of speedup opportunities for things that should be embarassingly parallel.

If we can figure out the right way to do this "back-filling", I kind of like the idea. But it's also a bit spooky... Thoughts?

hudson-ai avatar Apr 25 '25 23:04 hudson-ai

Some kind of lm.run() is a lot less magic and in a lot of ways, a lot more cumbersome (e.g. having to call run before every getitem, lest an exception...). But it's another approach to remove ambiguities and keep everything immutable.

@nopdive I know you're a fan of async dispatch. Any thoughs on your end?

hudson-ai avatar Apr 25 '25 23:04 hudson-ai

Notes / status update for anyone watching this --

  • Everything works, but the sticky points are still surrounding API
  • I'm currently working on "backfilling" discussed above in order to get rid of the ambiguity that comes with "forking". @nopdive and I outlined a version of that together that I think has acceptable ergonomics.
  • I'm leaning towards eliminating the async_get function and its siblings in favor of something that feels more like async dispatch, i.e. await lm.block_until_ready() or something of the sort. But deciding this can wait until the backfilling stuff is done

hudson-ai avatar May 07 '25 01:05 hudson-ai