dramatiq icon indicating copy to clipboard operation
dramatiq copied to clipboard

Dramatiq retry sends more retries instead of max 3 that are configured

Open bvidovic1 opened this issue 5 months ago • 0 comments

What OS are you using?

Running on python docker: python:3.8-slim

What version of Dramatiq are you using?

v1.15.0

What did you do?

I have dramatiq setup to take a message from the queue and send it to relevant endpoint that performs some ML processing. I start dramatiq with 1 process and 4 threads.

Dramatiq actor is setup like this:

@dramatiq.actor(
    **config.actors.handle_bla.params,
    time_limit=1000 * TIMEOUT,
    min_backoff=1000 * MIN_BACKOFF,
    max_backoff=1000 * MAX_BACKOFF,
    retry_when=should_retry_bla_handler,
)
def handle_bla(message: Mapping[str, Any]):

# code that does some processing and sends a post request to specific endpoint and get the result back.

Implementation of retry handler:

def should_retry_bla_handler(retries_so_far: int, exception: Exception):
    """
    Determines whether to retry processing a job based on the number of retries so far and the type of exception encountered.

    Args:
        retries_so_far (int): The number of retries attempted so far.
        exception (Exception): The exception that was raised.

    Returns:
        bool: True if processing should be retried, False otherwise.
    """
    max_retries = config.actors.handle_bla.get("max_retries", 3)

    logger.info(f"Attempted retries so far: {retries_so_far}")
    if retries_so_far >= max_retries:
        logger.error(f"Max number of job retries exceeded ({max_retries})")
        return False

    # When an invalid message is passed, we do not want to retry processing
    if isinstance(
        exception,
        (
            InvalidMessagePayloadError,
            InvalidProcessingRequestError,
        ),
    ):
        logger.error(f"Invalid job processing arguments provided: {exception}")
        return False
    # When there is a document mismatch between request and response we do not want to retry processing
    elif isinstance(
        exception,
        DocumentsMismatchInResponse,
    ):
        logger.error(f"Documents in response vs documents sent mismatch: {exception}")
        return False
    elif isinstance(
        exception,
        ConfigurationError,
    ):
        logger.error(f"Missing ML endpoint in configuration: {exception}")

    if isinstance(exception, ServiceRequestHandlingError):
        logger.warning("Encountered issue while handling processing request - retrying...")

    return True

What did you expect would happen?

I expect retry to be tried 3 times and if unsuccessful it should exit.

What happened?

Firstly what happens is I receive a message that 2 retries will be tried in a specific amount of milliseconds based on the min_backoff and max_backoff variables. Why 2 right away? Why not 1 by 1 sequentially?

Secondly, request retry happens more times (6,7,8 times) even though max is 3. This is causing a problem for my service, because it sends multiple requests towards the endpoint and clogs the workers there with same request. Is this happening because threads are used? Should I add some kind of threading lock mechanism?

Thanks in advance for any guidance.

bvidovic1 avatar Jan 25 '24 10:01 bvidovic1