dramatiq
dramatiq copied to clipboard
Unhandled error during post_process_message
What OS are you using?
Ubuntu 22.04
What version of Dramatiq are you using?
1.15.0
What did you do?
Processing parallel async messages using Dramatiq
What did you expect would happen?
It should work properly it should receive and process messages, but when two actors process the same message simultaneously it raises "Unhandled error during post_process_message ..." error after that getting hanged.
What happened?
We are processing parallel async messages. It worked fine until two actors processed the same message simultaneously. In our case, it is reproduced every time when the message processing by the actor exceeds the set timeout of message visibility in the queue.
For example, the message visibility timeout is 10 minutes.
- The actor "A_1" starts processing message "M". And does so for e.g. 15 minutes.
- After 10 minutes, message "M" becomes visible in the queue again and another actor "A_2" starts processing it. At this stage we have two actors processing the same message "M".
- Now one of the actors finishes processing message "M" and removes this message from the queue.
- The other actor also finishes processing the message "M" and gives the command to delete the message. This is where the error occurs.
Unhandled error during post_process_message(DramatiqTask(module_name='apps.tasks.tasks', func_name='async_test_dramatiq', args=[], kwargs={'_task_id': '018caace-07e8-7d4d-3995-59d3df607ff3'})). You've found a bug in Dramatiq. Please report it!
- After that Dramatiq stops processing messages, it hangs and do nothing.
We faced with this issue on Azure ASQ (using dramatic-azure lib) and Amazon SQS (using dramatiq-sqs lib). So it seems it is core issue of Dramatiq.
Traceback in case of Amazon SQS
ERROR 2023-12-27 10:26:38,493 worker Unhandled error during post_process_message(DramatiqTask(module_name='apps.tasks.tasks', func_name='async_test_dramatiq', args=[], kwargs={'_task_id': '018caace-07e8-7d4d-3995-59d3df607ff3'})). You've found a bug in Dramatiq. Please report it!
2023-12-27T10:26:38.510641460Z Although your message has been processed, it will be processed again once this worker is restarted.
2023-12-27T10:26:38.510644793Z Traceback (most recent call last):
2023-12-27T10:26:38.510646501Z File "/usr/local/lib/python3.11/dist-packages/dramatiq/worker.py", line 352, in post_process_message
2023-12-27T10:26:38.510648335Z self.consumer.ack(message)
2023-12-27T10:26:38.510649918Z File "/usr/local/lib/python3.11/dist-packages/dramatiq_sqs/broker.py", line 192, in ack
2023-12-27T10:26:38.510651585Z message._sqs_message.delete()
2023-12-27T10:26:38.510653168Z File "/usr/local/lib/python3.11/dist-packages/boto3/resources/factory.py", line 580, in do_action
2023-12-27T10:26:38.510654876Z response = action(self, *args, **kwargs)
2023-12-27T10:26:38.510656960Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-12-27T10:26:38.510658501Z File "/usr/local/lib/python3.11/dist-packages/boto3/resources/action.py", line 88, in __call__
2023-12-27T10:26:38.510660168Z response = getattr(parent.meta.client, operation_name)(*args, **params)
2023-12-27T10:26:38.510661793Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-12-27T10:26:38.510663376Z File "/usr/local/lib/python3.11/dist-packages/botocore/client.py", line 535, in _api_call
2023-12-27T10:26:38.510759751Z return self._make_api_call(operation_name, kwargs)
2023-12-27T10:26:38.510763418Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-12-27T10:26:38.510768585Z File "/usr/local/lib/python3.11/dist-packages/botocore/client.py", line 983, in _make_api_call
2023-12-27T10:26:38.510770376Z raise error_class(parsed_response, operation_name)
2023-12-27T10:26:38.510772210Z botocore.errorfactory.ReceiptHandleIsInvalid: An error occurred (ReceiptHandleIsInvalid) when calling the DeleteMessage operation: The receipt handle "7c501a40-bf89-4175-a97c-fec361c19276#61eef7e8-8efc-4905-a988-d48179c61154" is not valid.
Traceback in case of Azure ASQ
Unhandled error during post_process_message(<dramatiq_azure.asq._ASQMessage object at 0x7f97aad96e10>). You've found a bug in Dramatiq. Please report it!
Although your message has been processed, it will be processed again once this worker is restarted.
ResourceNotFoundError: The specified message does not exist.
RequestId:414fe41a-6003-003a-0863-388d1e000000
Time:2023-12-27T01:21:59.8804458Z
ErrorCode:MessageNotFound
File "dramatiq/worker.py", line 352, in post_process_message
self.consumer.ack(message)
File "dramatiq_azure/asq.py", line 134, in ack
self.__remove_from_queue(message)
File "dramatiq_azure/asq.py", line 129, in __remove_from_queue
self.q_client.delete_message(message._asq_message)
File "/usr/local/lib/python3.11/dist-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
return func(*args, **kwargs)
File "/usr/local/lib/python3.11/dist-packages/azure/storage/queue/_queue_client.py", line 956, in delete_message
process_storage_error(error)
File "/usr/local/lib/python3.11/dist-packages/azure/storage/queue/_shared/response_handlers.py", line 189, in process_storage_error
exec("raise error from None") # pylint: disable=exec-used # nosec
File "<string>", line 1, in <module>
# -------------------------------------------------------------------------
File "/usr/local/lib/python3.11/dist-packages/azure/storage/queue/_queue_client.py", line 949, in delete_message
self._client.message_id.delete(
File "/usr/local/lib/python3.11/dist-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
return func(*args, **kwargs)
File "/usr/local/lib/python3.11/dist-packages/azure/storage/queue/_generated/operations/_message_id_operations.py", line 279, in delete
map_error(status_code=response.status_code, response=response, error_map=error_map)
File "/usr/local/lib/python3.11/dist-packages/azure/core/exceptions.py", line 165, in map_error
raise error
Minimal example
@async_task
def async_test_dramatiq():
import time
print('Simulating work ...')
time.sleep(120)
print('Done')
>>> # set visibility timeout to 1 minute
>>> async_test_dramatiq.send()