channels_redis
channels_redis copied to clipboard
already awaited tasks dropped from queue if exception in a running task.
Using: channels_redis=4.1.0 redis=5.2.1
We use this to run async tasks in our project (with channels workers). We seem to be dropping already awaited tasks when an exception occurs in a running task. We use SyncConsumers for task definitions.
To reproduce:
Have a simple task that log the argument it was passed
def test(self, event):
import time
time.sleep(0.1)
data = event["data"]
if data == 50:
raise ValueError()
logger.info(data)
setup a channel layer with a high capacity (I did test with 2k) Without having any worker running, fill the channel at capacity with tasks with:
for i in range(max):
async_to_sync(channel_layer.send)(
"mychannel",
{
"type": "test",
"data": i,
},
)
I can see the ZCOUNT on redis going to 2k
watch -n 1 redis-cli ZCOUNT asgimychannel -inf +inf
Now when I start my only worker, I can see the count dropping way faster than the tasks aree run (based on the logs posted). As I understand it (not clearly enough though), the queue is emptied and an internal queue is filled with the awaited tasks. By the time we reach the 50th tsak, the ZCOUNT is 0. Once the task 50 is reached, we got the exception in the worker logs, but no more logs are posted.
My understanding is that the tasks 51 to 1999 are lost.
Is my understanding of what is happening correct (would love to understand a bit more of the de-queuing, especially if more than 1 worker is available)? Also, is that normal?
I fixed this issue by using the following consumer:
class SafeSyncConsumer(SyncConsumer):
@database_sync_to_async
def dispatch(self, message):
"""
Dispatches incoming messages to type-based handlers asynchronously.
Catches any exception raised in the handler and logs it to avoid having the worker dump already awaited tasks.
"""
# Get and execute the handler
handler = getattr(self, get_handler_name(message), None)
if handler:
try:
handler(message)
except Exception:
logger.exception("Exception in consumer method (%s.%s)", self.__class__.__name__, handler.__name__)
else:
logger.exception("No handler for message type %s" % message["type"])
I fixed this issue by using the following consumer:
I'm confused, how did that fix the problem?
I fixed this issue by using the following consumer:
I'm confused, how did that fix the problem?
Sorry if my post is confusing. It did solve the problem as all the tasks queued were ran, even those after the 50th that raise the exception.
I have a feeling that the the exception raised in the handler(message) call does not play nice with the database_sync_to_async decorator.
I tried to subclass the SyncConsumer, surcharge the dispatch method (without the decorator) and call super().dispatch(message) in a try: ... except: ... but then when testing I had the same missing logs after the 50th.
Sorry I'm still confused, how does SafeSyncConsumer fix this? It appears to just catch an exception and log it. What am I missing?