taskiq-pipelines icon indicating copy to clipboard operation
taskiq-pipelines copied to clipboard

Feat: Allow multiple arguments to pass to mapper

Open fubuloubu opened this issue 1 year ago • 0 comments
trafficstars

I was trying to build a pipeline where I would have a list of tuples being returned from the first sequential task into a mapper, that should take one of the tuples and apply it for multiple args

Proposed API maybe something like this?

@broker.task
async def get_list() -> list[tuple[int, int]]:
    # some sort of process to obtain the args for the next function
    return [(1, 2), (3, 4), (5, 6), ...]

@broker.task
async def handle_args(a: int, b: int) -> int:
    return a * b  # imagine this is more computationally expensive

pipe = Pipeline(broker, get_list).map(handle_args, multiple_args=True)  # if set, would be simialr to *args
task = await pipe.kiq()
result = await task.wait_result()
print(sum(result.return_value))

fubuloubu avatar Oct 25 '24 05:10 fubuloubu