dramatiq icon indicating copy to clipboard operation
dramatiq copied to clipboard

Unhandled error during post_process_message

Open dvazar opened this issue 6 months ago • 0 comments

What OS are you using?

Ubuntu 22.04

What version of Dramatiq are you using?

1.15.0

What did you do?

Processing parallel async messages using Dramatiq

What did you expect would happen?

It should work properly it should receive and process messages, but when two actors process the same message simultaneously it raises "Unhandled error during post_process_message ..." error after that getting hanged.

What happened?

We are processing parallel async messages. It worked fine until two actors processed the same message simultaneously. In our case, it is reproduced every time when the message processing by the actor exceeds the set timeout of message visibility in the queue.

For example, the message visibility timeout is 10 minutes.

  1. The actor "A_1" starts processing message "M". And does so for e.g. 15 minutes.
  2. After 10 minutes, message "M" becomes visible in the queue again and another actor "A_2" starts processing it. At this stage we have two actors processing the same message "M".
  3. Now one of the actors finishes processing message "M" and removes this message from the queue.
  4. The other actor also finishes processing the message "M" and gives the command to delete the message. This is where the error occurs. Unhandled error during post_process_message(DramatiqTask(module_name='apps.tasks.tasks', func_name='async_test_dramatiq', args=[], kwargs={'_task_id': '018caace-07e8-7d4d-3995-59d3df607ff3'})). You've found a bug in Dramatiq. Please report it!
  5. After that Dramatiq stops processing messages, it hangs and do nothing.

We faced with this issue on Azure ASQ (using dramatic-azure lib) and Amazon SQS (using dramatiq-sqs lib). So it seems it is core issue of Dramatiq.

Traceback in case of Amazon SQS

ERROR 2023-12-27 10:26:38,493 worker  Unhandled error during post_process_message(DramatiqTask(module_name='apps.tasks.tasks', func_name='async_test_dramatiq', args=[], kwargs={'_task_id': '018caace-07e8-7d4d-3995-59d3df607ff3'})).  You've found a bug in Dramatiq.  Please report it!
2023-12-27T10:26:38.510641460Z Although your message has been processed, it will be processed again once this worker is restarted.
2023-12-27T10:26:38.510644793Z Traceback (most recent call last):
2023-12-27T10:26:38.510646501Z   File "/usr/local/lib/python3.11/dist-packages/dramatiq/worker.py", line 352, in post_process_message
2023-12-27T10:26:38.510648335Z     self.consumer.ack(message)
2023-12-27T10:26:38.510649918Z   File "/usr/local/lib/python3.11/dist-packages/dramatiq_sqs/broker.py", line 192, in ack
2023-12-27T10:26:38.510651585Z     message._sqs_message.delete()
2023-12-27T10:26:38.510653168Z   File "/usr/local/lib/python3.11/dist-packages/boto3/resources/factory.py", line 580, in do_action
2023-12-27T10:26:38.510654876Z     response = action(self, *args, **kwargs)
2023-12-27T10:26:38.510656960Z                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-12-27T10:26:38.510658501Z   File "/usr/local/lib/python3.11/dist-packages/boto3/resources/action.py", line 88, in __call__
2023-12-27T10:26:38.510660168Z     response = getattr(parent.meta.client, operation_name)(*args, **params)
2023-12-27T10:26:38.510661793Z                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-12-27T10:26:38.510663376Z   File "/usr/local/lib/python3.11/dist-packages/botocore/client.py", line 535, in _api_call
2023-12-27T10:26:38.510759751Z     return self._make_api_call(operation_name, kwargs)
2023-12-27T10:26:38.510763418Z            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-12-27T10:26:38.510768585Z   File "/usr/local/lib/python3.11/dist-packages/botocore/client.py", line 983, in _make_api_call
2023-12-27T10:26:38.510770376Z     raise error_class(parsed_response, operation_name)
2023-12-27T10:26:38.510772210Z botocore.errorfactory.ReceiptHandleIsInvalid: An error occurred (ReceiptHandleIsInvalid) when calling the DeleteMessage operation: The receipt handle "7c501a40-bf89-4175-a97c-fec361c19276#61eef7e8-8efc-4905-a988-d48179c61154" is not valid.

Traceback in case of Azure ASQ

Unhandled error during post_process_message(<dramatiq_azure.asq._ASQMessage object at 0x7f97aad96e10>).  You've found a bug in Dramatiq.  Please report it!
Although your message has been processed, it will be processed again once this worker is restarted.
ResourceNotFoundError: The specified message does not exist.
RequestId:414fe41a-6003-003a-0863-388d1e000000
Time:2023-12-27T01:21:59.8804458Z
ErrorCode:MessageNotFound
  File "dramatiq/worker.py", line 352, in post_process_message
    self.consumer.ack(message)
  File "dramatiq_azure/asq.py", line 134, in ack
    self.__remove_from_queue(message)
  File "dramatiq_azure/asq.py", line 129, in __remove_from_queue
    self.q_client.delete_message(message._asq_message)
  File "/usr/local/lib/python3.11/dist-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.11/dist-packages/azure/storage/queue/_queue_client.py", line 956, in delete_message
    process_storage_error(error)
  File "/usr/local/lib/python3.11/dist-packages/azure/storage/queue/_shared/response_handlers.py", line 189, in process_storage_error
    exec("raise error from None")   # pylint: disable=exec-used # nosec
  File "<string>", line 1, in <module>
    # -------------------------------------------------------------------------
  File "/usr/local/lib/python3.11/dist-packages/azure/storage/queue/_queue_client.py", line 949, in delete_message
    self._client.message_id.delete(
  File "/usr/local/lib/python3.11/dist-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.11/dist-packages/azure/storage/queue/_generated/operations/_message_id_operations.py", line 279, in delete
    map_error(status_code=response.status_code, response=response, error_map=error_map)
  File "/usr/local/lib/python3.11/dist-packages/azure/core/exceptions.py", line 165, in map_error
    raise error

Minimal example

@async_task
def async_test_dramatiq():
    import time

    print('Simulating work ...')
    time.sleep(120)
    print('Done')
>>> # set visibility timeout to 1 minute
>>> async_test_dramatiq.send()

dvazar avatar Dec 27 '23 17:12 dvazar