dramatiq
dramatiq copied to clipboard
Dramatiq retry sends more retries instead of max 3 that are configured
What OS are you using?
Running on python docker:
python:3.8-slim
What version of Dramatiq are you using?
v1.15.0
What did you do?
I have dramatiq setup to take a message from the queue and send it to relevant endpoint that performs some ML processing. I start dramatiq with 1 process and 4 threads.
Dramatiq actor is setup like this:
@dramatiq.actor(
**config.actors.handle_bla.params,
time_limit=1000 * TIMEOUT,
min_backoff=1000 * MIN_BACKOFF,
max_backoff=1000 * MAX_BACKOFF,
retry_when=should_retry_bla_handler,
)
def handle_bla(message: Mapping[str, Any]):
# code that does some processing and sends a post request to specific endpoint and get the result back.
Implementation of retry handler:
def should_retry_bla_handler(retries_so_far: int, exception: Exception):
"""
Determines whether to retry processing a job based on the number of retries so far and the type of exception encountered.
Args:
retries_so_far (int): The number of retries attempted so far.
exception (Exception): The exception that was raised.
Returns:
bool: True if processing should be retried, False otherwise.
"""
max_retries = config.actors.handle_bla.get("max_retries", 3)
logger.info(f"Attempted retries so far: {retries_so_far}")
if retries_so_far >= max_retries:
logger.error(f"Max number of job retries exceeded ({max_retries})")
return False
# When an invalid message is passed, we do not want to retry processing
if isinstance(
exception,
(
InvalidMessagePayloadError,
InvalidProcessingRequestError,
),
):
logger.error(f"Invalid job processing arguments provided: {exception}")
return False
# When there is a document mismatch between request and response we do not want to retry processing
elif isinstance(
exception,
DocumentsMismatchInResponse,
):
logger.error(f"Documents in response vs documents sent mismatch: {exception}")
return False
elif isinstance(
exception,
ConfigurationError,
):
logger.error(f"Missing ML endpoint in configuration: {exception}")
if isinstance(exception, ServiceRequestHandlingError):
logger.warning("Encountered issue while handling processing request - retrying...")
return True
What did you expect would happen?
I expect retry to be tried 3 times and if unsuccessful it should exit.
What happened?
Firstly what happens is I receive a message that 2 retries will be tried in a specific amount of milliseconds based on the min_backoff
and max_backoff
variables. Why 2 right away? Why not 1 by 1 sequentially?
Secondly, request retry happens more times (6,7,8 times) even though max is 3. This is causing a problem for my service, because it sends multiple requests towards the endpoint and clogs the workers there with same request. Is this happening because threads are used? Should I add some kind of threading lock mechanism?
Thanks in advance for any guidance.