workflows-py icon indicating copy to clipboard operation
workflows-py copied to clipboard

[Feature Request] Improve the map-reduce (fan-out/gather) syntax

Open logan-markewich opened this issue 4 months ago • 0 comments

Currently, to do a map reduce pattern, you need a decent amount of boilerplate

import random

from workflows import Context, Workflow, step
from workflows.events import Event, StartEvent, StopEvent


class TaskRequested(Event):
    pass


class TaskReady(Event):
    pass

class FanOutFanIn(Workflow):
    @step
    async def fan_out(self, ctx: Context, ev: StartEvent) -> TaskRequested | None:
        times = random.randint(1, 10)
        await ctx.store.set("times", times)
        for _ in range(times):
            ctx.send_event(TaskRequested())

    @step
    async def process_task(self, ev: TaskRequested) -> TaskReady:
        # do work
        return TaskReady()

    @step
    async def fan_in(self, ctx: Context, ev: TaskReady) -> StopEvent | None:
        num_to_collect = await ctx.store.get("times")
        events = ctx.collect_events(ev, [TaskReady] * num_to_collect)
        if events is not None
            return StopEvent(results=events)

Notice several things

  • extremely verbose, lots of places to make a mistake
  • technically incorrect step annotations to make the workflow validator happy (the map step doesn't actually "return" MapEvent)
  • needing to know ahead of time how many events to collect

I think this pattern really starts to break down when you get more complex patterns. What if I have several levels of mapping?

I think ideally, there is a better way to do this, or at the very least, nicer syntactic sugar around this

logan-markewich avatar Jul 29 '25 22:07 logan-markewich