workflows-py
workflows-py copied to clipboard
[Feature Request] Improve the map-reduce (fan-out/gather) syntax
Currently, to do a map reduce pattern, you need a decent amount of boilerplate
import random
from workflows import Context, Workflow, step
from workflows.events import Event, StartEvent, StopEvent
class TaskRequested(Event):
pass
class TaskReady(Event):
pass
class FanOutFanIn(Workflow):
@step
async def fan_out(self, ctx: Context, ev: StartEvent) -> TaskRequested | None:
times = random.randint(1, 10)
await ctx.store.set("times", times)
for _ in range(times):
ctx.send_event(TaskRequested())
@step
async def process_task(self, ev: TaskRequested) -> TaskReady:
# do work
return TaskReady()
@step
async def fan_in(self, ctx: Context, ev: TaskReady) -> StopEvent | None:
num_to_collect = await ctx.store.get("times")
events = ctx.collect_events(ev, [TaskReady] * num_to_collect)
if events is not None
return StopEvent(results=events)
Notice several things
- extremely verbose, lots of places to make a mistake
- technically incorrect step annotations to make the workflow validator happy (the map step doesn't actually "return" MapEvent)
- needing to know ahead of time how many events to collect
I think this pattern really starts to break down when you get more complex patterns. What if I have several levels of mapping?
I think ideally, there is a better way to do this, or at the very least, nicer syntactic sugar around this