burr icon indicating copy to clipboard operation
burr copied to clipboard

Parallel map-style actions

Open elijahbenizzy opened this issue 1 year ago • 10 comments

Overview

We should be able to launch a whole bunch of actions in parallel. Walking the state machine in parallel is tricky, so this proposes the following:

  1. A MultiAction that performs a map operation over some state field
  2. This then spawns and delegates subactions
  3. These sub actions each have access to a portion of that state field + everything else they need in state
  4. These then write back to state
  5. The user specifies how to reduce the result
  6. This is done via a configurable executor -- asyncio, threading, etc... With default of multithreading (for sync) and asyncio (for async)

Use-cases

  1. PR-bot -- want to determine how to respond for each comment, also for each file
  2. Firing off requests to multiple LLMs at once to determine the best/quickest
  3. Optimizing over multiple ML algorithms at once.
  4. Web scraper -- send a bunch of requests out

Requirements:

  1. Recursive, all the way down. Should use the same executor for sub-actions, sub-sub-actions, etc...
  2. Each individual update (from an action completing) is treated as a state update
  3. Idempotency is encouraged, if not built in (for instance, it'll decide the tasks on the input fields minus the output fields)
  4. Configurable parallelism (max rate, etc...)
  5. Hooks, everything respected as expected
  6. Quick-exit mode (get the first then cancel the others), join-all mode (join them all)

Design Qs:

  1. Should the actions that can be launched from an action all be the same? Or should there be a possibility to launch two of them?
  2. How to handle action failures? Should we allow some? Do retries? End up having the actions themselves handle failure cases and not deal with them at all?
  3. API -- class-based? Function-based?

elijahbenizzy avatar Mar 02 '24 18:03 elijahbenizzy

This is very likely dependent on #33 -- we'll need to layer/apply updates to state in a nice way. We can probably get around that but it is worth considering.

elijahbenizzy avatar Mar 02 '24 18:03 elijahbenizzy

What I would do today is just do it internal to the action:

@action(...)
def my_parallel_action(state: State, .. ) -> tuple(dict, state):
      # do the parallelization myself with threading/asyncio/etc.
      futures = [ ... ]
      # wait for first, or wait for all
     result = wait_for_futures(futures)
     # do state update here / handle any failures etc
     state = update_state(state, result)
     return result, state

So the question if we provide framework support for something like this is "what are we removing/simplifying/enhancing'"?

skrawcz avatar Mar 03 '24 04:03 skrawcz

What I would do today is just do it internal to the action:


@action(...)

def my_parallel_action(state: State, .. ) -> tuple(dict, state):

      # do the parallelization myself with threading/asyncio/etc.

      futures = [ ... ]

      # wait for first, or wait for all

     result = wait_for_futures(futures)
     # do state update here / handle any failures etc
     state = update_state(state, result)
     return result, state

So the question if we provide framework support for something like this is "what are we removing/simplifying/enhancing'"?

Not a requirement (yet). But the enhancement is visibility/telemetry/checkpointing for long-running jobs.

Furthermore, when we combine this with the ability to build actions recursively then it really simplifies things.

Say you're building a PR bot such as ellipsis. You'll want to respond to each comment in parallel, otherwise it'll take too long. Each comment could be a multi-step process. This + composable actions make that feasible. Again, not necessary (there are multiple ways to model it).

elijahbenizzy avatar Mar 03 '24 05:03 elijahbenizzy

Plan of action:

Use-case (sample, for scrapegraph):

  • [ ] List a bunch of URLs
  • [ ] Run a single step that spawns multiple graphs
  • [ ] Join the results together

(1) Tracking/pointer storage

One subgraph points to the other. This is a spawner/subgraph relationship (not to be confused with parent

  • [ ] Pass a common parameter (pointer/container) into the app so we can store it with the tracker
  • [ ] Don’t have Burr automatically know about it beforehand (yet)
  • [ ] MVP — view pointer links in the UI — click back and forth, maybe show substeps in graph

(2) Store statically — we can store the subgraph relationships statically so we can display easily:

  • [ ] Have a node be a subgraph type and register it
  • [ ] Have it still responsible for running itself (or running many in parallel)
  • [ ] We can view these as clusters/subgraphs in the UI, although it’ll be a little tricky to display current

(3) Burr handles execution — we can wrap the prior one in this so we automatically launch a ton of them

  • [ ] More complex — state is either multiple at once, queue-based, or something clever
  • [ ] Scrapegraph doesn’t need it as it can handle parallelism

elijahbenizzy avatar May 28 '24 18:05 elijahbenizzy

Design for phase (1):

@action(...)
def spawning_action(..., __context: AppContext):
    app = (
        ApplicationBuilder()...
        .with_spawning_parent(
            __context.app_id, 
            __context.sequence_id)
        .with_tracker(__context.tracker)
        .build()
     )
    ... = app.run(...)
    return ...

elijahbenizzy avatar May 28 '24 18:05 elijahbenizzy

Looking at the second phase here -- how do we make this ergonomic? We'll need an action to specify:

  1. How to map — E.G. generate subgraphs/actions from state — state + inputs
  2. What the subgraphs are
  3. Where to enter the subgraphs
  4. Which executor to use (async for async, otherwise should be pluggable)
  5. Where to exit the subgraph (what the halt_after are for the subgraph)
  6. What to do in case of error

elijahbenizzy avatar Sep 14 '24 15:09 elijahbenizzy

Some raw notes from a chat with @skrawcz yesterday:

with_actions(
	parallel_example=MapReduceAction(
		lambda state: [
			TaskSpec(...) for item in state["item_to_map_over"]
		]
	)
)
# this is self-contained
@action(reads=[], writes=[])
def my_action() -> State:
	pass




graph1 = graphBuilder.().build()
graph2 = graphBuilder.().build()

graphAction1 = graphAction(graph1, entry_point, halt_after) # this is atomic here without state maping & reducing;
# it could have state mapping & reducing
graphAction2 = graphAction(graph1, entry_point, halt_after, map... reduce, ...)

graphAction3 = graphAction1.bind(map_stat=..., reduce_state=...)  # 

graphAction4 = graphAction(map_actions=[graph1], map_state=..., reduce_state=...)

@mapper(reads=[])
def state_mapper(state) -> state:
	for x in state["foo"]:
		yield x

@reducer(reads=[], writes=[])
def state_reducer(base_accumulator: State, states: generator) -> State:
	results = []
	for s in states:
		results.append(s)
	return base_accumulator.update(some_key=results)

graphAction4 = graphAction(map_actions=[my_action], map_state=, reduce_state=...)

class MyGraphAction():
	# so maybe class based action
	...
    

"""
Class based might be simpler here.
We need to know what is read / written to state from the out level
The class then handles the functions to translate between things.
This seems simpler than trying to be very function forward.
"""

class BaseRecursiveAction(Action):
	def create_task_specs(self, state: State) -> TaskSpec[]:
		...

	def reduce(self, initial_state: State, states: Generator[State]) -> State:
		...

	def run(state: State, __executor: Executor, __context: ApplicationContext) -> dict:
		# treat it as a generator
		tasks = create_task_specs(...)
		executor = __executor # or pull from task spec if you want to override
		for task in tasks:
			# how to make sure we have the same persistence config with different app IDs
			# TODO -- generate the app ID as a stable hash of the inputs in some way
			# App ID -- force it to be unique names for each tasks, stable?
			app = ApplicationBuilder()....with_tracker(...).with_persister()...build()
		results = executor.run_all(tasks) # return a generator
		return {"results" : results}

	def update(result: dict, state: State):
		reduced = self.reduce(state, results["results"]) # return a generator
		return reduced



	def inputs():
		...

class MapReduceAction(BaseRecursiveAction):
	def __init__(
		self, 
		persist: Union["cascade", Persister, None]="cascade", 
		tracker: Union["cascade", Persister, None]="cascade", 
	):
		# TODO -- decide on persistence (how they use it? Should you pass it in?)
		...

	def create_task_specs(self, state: State) -> Generator[TaskSpec]:
		# state is either a state or a tuple of state, id
		for i, state in self.map_state(state):
			for action in self.map_actions():
				if isinstance(action, GraphAction):
					... # get the graph + info from the graph action
				if isinstance(action, Action):
					... # create the graph of a single node
				app_id = f"{action.name}_{i}" if isinstance(state, State) else f"{state[1]}"
				task_spec = TaskSpec(graph=..., app_id=app_id halt_after=..., entry_point=..., inputs=...)
				yield task_spec # yield state, runnablegraph?

	def reads(self) -> list[str]:
		...

	def writes(self) -> list[str]:
		...

	def map_state(self, state: State) -> Generator[State]: # Tuple of state with app_id? Appended with action?
		...

	def map_actions() -> Action | RunnableGraph: # is runnable

	def reduce_state(self, initial_state: State, states: Generator[State]) -> State:
		...



with_actions(foo=graph1, bar=graph2, baz=my_action)
with_transitions()


# mapped subgraph
with_actions(
	parallel_example=MapReduceAction(
		actions=lambda state: [
			SubGraph(
				seed_state=state.update(...),
				inputs=...,
			) for item in state["item_to_map_over"]
		],
		entrypoint=...,
		halt_after=...,
		reduce=lambda states: ...
	)
)

# single subgraph
with_actions(
	subgraph=GraphAction(
		state_in
	) 
)

elijahbenizzy avatar Sep 20 '24 22:09 elijahbenizzy

Notes:

Simplifying assumption - independence:

  1. You write to X separate places in state
  2. No map needs to occur.
  3. You write to X separate places in state
  4. No reduce needs to happen
  5. No extra "annotations" needed.

In case of failure -- this is clear I think.

skrawcz avatar Sep 20 '24 22:09 skrawcz

Coupling options:

  1. at action definition time (e.g. in the decorator)
  2. at with_actions time (e.g. wrap an action/graph)
  3. at edge definition time.
  4. at state definition time
  5. some combination of the above

skrawcz avatar Sep 20 '24 23:09 skrawcz

See design here: https://burr.dagworks.io/pull/370/concepts/parallelism/

elijahbenizzy avatar Sep 26 '24 17:09 elijahbenizzy

This is complete https://burr.dagworks.io/concepts/parallelism/

elijahbenizzy avatar Apr 20 '25 16:04 elijahbenizzy