taskiq-pipelines icon indicating copy to clipboard operation
taskiq-pipelines copied to clipboard

Bug: Retrying task produces the error as the pipeline result

Open kai-nashi opened this issue 4 months ago • 0 comments

Description

  1. Schedule a pipeline and wait for its result.
  2. The first task in the pipeline raises an exception and goes to retry.
  3. Retrieve the pipeline result as an error.
  4. The pipeline then successfully completes with the correct result.

Expected

Await the pipeline’s result while a task is retrying.

Actual

The first exception is returned as the result regardless of retries.

Code

import asyncio
import math

from taskiq import Context
from taskiq import SimpleRetryMiddleware
from taskiq import TaskiqDepends
from taskiq import InMemoryBroker
from taskiq_pipelines import Pipeline
from taskiq_pipelines import PipelineMiddleware
from taskiq_redis import RedisAsyncResultBackend
from taskiq_redis import RedisStreamBroker

result_backend = RedisAsyncResultBackend(
    redis_url="redis://localhost:6379",
)

broker = (
    RedisStreamBroker(
        url="redis://localhost:6379",
    )
    .with_middlewares(
        PipelineMiddleware(),
        SimpleRetryMiddleware(default_retry_count=3),
    )
    .with_result_backend(result_backend)
)

check_interval = 0.2


@broker.task("power", retry_on_error=True)
async def power(x: int, y: int = 2, context: "Context" = TaskiqDepends()) -> int:
    if context.message.labels.get("_retries", 0) == 0:
        raise ValueError()
    await asyncio.sleep(check_interval * 10)
    result = x**y
    print(f"{x} ** {y} = {result}")
    return result


@broker.task("sqrt", retry_on_error=True)
async def sqrt(value: int) -> float:
    result = math.sqrt(value)
    print(f"sqrt({value}) = {result}")
    return result  #


pipeline = Pipeline(broker).call_next(power).call_next(sqrt)


async def main():
    task = await pipeline.kiq(2)
    result = await task.wait_result(check_interval=check_interval)
    print("result:", result)


if __name__ == "__main__":
    asyncio.run(main())

kai-nashi avatar Aug 12 '25 17:08 kai-nashi