Parallel map-style actions
Overview
We should be able to launch a whole bunch of actions in parallel. Walking the state machine in parallel is tricky, so this proposes the following:
- A
MultiActionthat performs a map operation over some state field - This then spawns and delegates subactions
- These sub actions each have access to a portion of that state field + everything else they need in state
- These then write back to state
- The user specifies how to reduce the result
- This is done via a configurable executor -- asyncio, threading, etc... With default of
multithreading(for sync) andasyncio(for async)
Use-cases
- PR-bot -- want to determine how to respond for each comment, also for each file
- Firing off requests to multiple LLMs at once to determine the best/quickest
- Optimizing over multiple ML algorithms at once.
- Web scraper -- send a bunch of requests out
Requirements:
- Recursive, all the way down. Should use the same executor for sub-actions, sub-sub-actions, etc...
- Each individual update (from an action completing) is treated as a state update
- Idempotency is encouraged, if not built in (for instance, it'll decide the tasks on the input fields minus the output fields)
- Configurable parallelism (max rate, etc...)
- Hooks, everything respected as expected
- Quick-exit mode (get the first then cancel the others), join-all mode (join them all)
Design Qs:
- Should the actions that can be launched from an action all be the same? Or should there be a possibility to launch two of them?
- How to handle action failures? Should we allow some? Do retries? End up having the actions themselves handle failure cases and not deal with them at all?
- API -- class-based? Function-based?
This is very likely dependent on #33 -- we'll need to layer/apply updates to state in a nice way. We can probably get around that but it is worth considering.
What I would do today is just do it internal to the action:
@action(...)
def my_parallel_action(state: State, .. ) -> tuple(dict, state):
# do the parallelization myself with threading/asyncio/etc.
futures = [ ... ]
# wait for first, or wait for all
result = wait_for_futures(futures)
# do state update here / handle any failures etc
state = update_state(state, result)
return result, state
So the question if we provide framework support for something like this is "what are we removing/simplifying/enhancing'"?
What I would do today is just do it internal to the action:
@action(...) def my_parallel_action(state: State, .. ) -> tuple(dict, state): # do the parallelization myself with threading/asyncio/etc. futures = [ ... ] # wait for first, or wait for all result = wait_for_futures(futures) # do state update here / handle any failures etc state = update_state(state, result) return result, stateSo the question if we provide framework support for something like this is "what are we removing/simplifying/enhancing'"?
Not a requirement (yet). But the enhancement is visibility/telemetry/checkpointing for long-running jobs.
Furthermore, when we combine this with the ability to build actions recursively then it really simplifies things.
Say you're building a PR bot such as ellipsis. You'll want to respond to each comment in parallel, otherwise it'll take too long. Each comment could be a multi-step process. This + composable actions make that feasible. Again, not necessary (there are multiple ways to model it).
Plan of action:
Use-case (sample, for scrapegraph):
- [ ] List a bunch of URLs
- [ ] Run a single step that spawns multiple graphs
- [ ] Join the results together
(1) Tracking/pointer storage
One subgraph points to the other. This is a spawner/subgraph relationship (not to be confused with parent
- [ ] Pass a common parameter (pointer/container) into the app so we can store it with the tracker
- [ ] Don’t have Burr automatically know about it beforehand (yet)
- [ ] MVP — view pointer links in the UI — click back and forth, maybe show substeps in graph
(2) Store statically — we can store the subgraph relationships statically so we can display easily:
- [ ] Have a node be a subgraph type and register it
- [ ] Have it still responsible for running itself (or running many in parallel)
- [ ] We can view these as clusters/subgraphs in the UI, although it’ll be a little tricky to display current
(3) Burr handles execution — we can wrap the prior one in this so we automatically launch a ton of them
- [ ] More complex — state is either multiple at once, queue-based, or something clever
- [ ] Scrapegraph doesn’t need it as it can handle parallelism
Design for phase (1):
@action(...)
def spawning_action(..., __context: AppContext):
app = (
ApplicationBuilder()...
.with_spawning_parent(
__context.app_id,
__context.sequence_id)
.with_tracker(__context.tracker)
.build()
)
... = app.run(...)
return ...
Looking at the second phase here -- how do we make this ergonomic? We'll need an action to specify:
- How to map — E.G. generate subgraphs/actions from state — state + inputs
- What the subgraphs are
- Where to enter the subgraphs
- Which executor to use (async for async, otherwise should be pluggable)
- Where to exit the subgraph (what the halt_after are for the subgraph)
- What to do in case of error
Some raw notes from a chat with @skrawcz yesterday:
with_actions(
parallel_example=MapReduceAction(
lambda state: [
TaskSpec(...) for item in state["item_to_map_over"]
]
)
)
# this is self-contained
@action(reads=[], writes=[])
def my_action() -> State:
pass
graph1 = graphBuilder.().build()
graph2 = graphBuilder.().build()
graphAction1 = graphAction(graph1, entry_point, halt_after) # this is atomic here without state maping & reducing;
# it could have state mapping & reducing
graphAction2 = graphAction(graph1, entry_point, halt_after, map... reduce, ...)
graphAction3 = graphAction1.bind(map_stat=..., reduce_state=...) #
graphAction4 = graphAction(map_actions=[graph1], map_state=..., reduce_state=...)
@mapper(reads=[])
def state_mapper(state) -> state:
for x in state["foo"]:
yield x
@reducer(reads=[], writes=[])
def state_reducer(base_accumulator: State, states: generator) -> State:
results = []
for s in states:
results.append(s)
return base_accumulator.update(some_key=results)
graphAction4 = graphAction(map_actions=[my_action], map_state=, reduce_state=...)
class MyGraphAction():
# so maybe class based action
...
"""
Class based might be simpler here.
We need to know what is read / written to state from the out level
The class then handles the functions to translate between things.
This seems simpler than trying to be very function forward.
"""
class BaseRecursiveAction(Action):
def create_task_specs(self, state: State) -> TaskSpec[]:
...
def reduce(self, initial_state: State, states: Generator[State]) -> State:
...
def run(state: State, __executor: Executor, __context: ApplicationContext) -> dict:
# treat it as a generator
tasks = create_task_specs(...)
executor = __executor # or pull from task spec if you want to override
for task in tasks:
# how to make sure we have the same persistence config with different app IDs
# TODO -- generate the app ID as a stable hash of the inputs in some way
# App ID -- force it to be unique names for each tasks, stable?
app = ApplicationBuilder()....with_tracker(...).with_persister()...build()
results = executor.run_all(tasks) # return a generator
return {"results" : results}
def update(result: dict, state: State):
reduced = self.reduce(state, results["results"]) # return a generator
return reduced
def inputs():
...
class MapReduceAction(BaseRecursiveAction):
def __init__(
self,
persist: Union["cascade", Persister, None]="cascade",
tracker: Union["cascade", Persister, None]="cascade",
):
# TODO -- decide on persistence (how they use it? Should you pass it in?)
...
def create_task_specs(self, state: State) -> Generator[TaskSpec]:
# state is either a state or a tuple of state, id
for i, state in self.map_state(state):
for action in self.map_actions():
if isinstance(action, GraphAction):
... # get the graph + info from the graph action
if isinstance(action, Action):
... # create the graph of a single node
app_id = f"{action.name}_{i}" if isinstance(state, State) else f"{state[1]}"
task_spec = TaskSpec(graph=..., app_id=app_id halt_after=..., entry_point=..., inputs=...)
yield task_spec # yield state, runnablegraph?
def reads(self) -> list[str]:
...
def writes(self) -> list[str]:
...
def map_state(self, state: State) -> Generator[State]: # Tuple of state with app_id? Appended with action?
...
def map_actions() -> Action | RunnableGraph: # is runnable
def reduce_state(self, initial_state: State, states: Generator[State]) -> State:
...
with_actions(foo=graph1, bar=graph2, baz=my_action)
with_transitions()
# mapped subgraph
with_actions(
parallel_example=MapReduceAction(
actions=lambda state: [
SubGraph(
seed_state=state.update(...),
inputs=...,
) for item in state["item_to_map_over"]
],
entrypoint=...,
halt_after=...,
reduce=lambda states: ...
)
)
# single subgraph
with_actions(
subgraph=GraphAction(
state_in
)
)
Notes:
Simplifying assumption - independence:
- You write to X separate places in state
- No map needs to occur.
- You write to X separate places in state
- No reduce needs to happen
- No extra "annotations" needed.
In case of failure -- this is clear I think.
Coupling options:
- at action definition time (e.g. in the decorator)
- at with_actions time (e.g. wrap an action/graph)
- at edge definition time.
- at state definition time
- some combination of the above
See design here: https://burr.dagworks.io/pull/370/concepts/parallelism/
This is complete https://burr.dagworks.io/concepts/parallelism/