channels_redis icon indicating copy to clipboard operation
channels_redis copied to clipboard

already awaited tasks dropped from queue if exception in a running task.

Open gaelworkstaff opened this issue 7 months ago • 3 comments

Using: channels_redis=4.1.0 redis=5.2.1

We use this to run async tasks in our project (with channels workers). We seem to be dropping already awaited tasks when an exception occurs in a running task. We use SyncConsumers for task definitions.

To reproduce:

Have a simple task that log the argument it was passed

def test(self, event):
    import time
    time.sleep(0.1)

    data = event["data"]
    if data == 50:
        raise ValueError()
    logger.info(data)

setup a channel layer with a high capacity (I did test with 2k) Without having any worker running, fill the channel at capacity with tasks with:

for i in range(max):
    async_to_sync(channel_layer.send)(
        "mychannel",
        {
            "type": "test",
            "data": i,
        },
    )

I can see the ZCOUNT on redis going to 2k

 watch -n 1 redis-cli ZCOUNT asgimychannel -inf +inf

Now when I start my only worker, I can see the count dropping way faster than the tasks aree run (based on the logs posted). As I understand it (not clearly enough though), the queue is emptied and an internal queue is filled with the awaited tasks. By the time we reach the 50th tsak, the ZCOUNT is 0. Once the task 50 is reached, we got the exception in the worker logs, but no more logs are posted.

My understanding is that the tasks 51 to 1999 are lost.

Is my understanding of what is happening correct (would love to understand a bit more of the de-queuing, especially if more than 1 worker is available)? Also, is that normal?

I fixed this issue by using the following consumer:

class SafeSyncConsumer(SyncConsumer):
    @database_sync_to_async
    def dispatch(self, message):
        """
        Dispatches incoming messages to type-based handlers asynchronously.
        Catches any exception raised in the handler and logs it to avoid having the worker dump already awaited tasks.
        """
        # Get and execute the handler
        handler = getattr(self, get_handler_name(message), None)
        if handler:
            try:
                handler(message)
            except Exception:
                logger.exception("Exception in consumer method (%s.%s)", self.__class__.__name__, handler.__name__)
        else:
            logger.exception("No handler for message type %s" % message["type"])

gaelworkstaff avatar Apr 17 '25 19:04 gaelworkstaff

I fixed this issue by using the following consumer:

I'm confused, how did that fix the problem?

bigfootjon avatar Apr 25 '25 05:04 bigfootjon

I fixed this issue by using the following consumer:

I'm confused, how did that fix the problem?

Sorry if my post is confusing. It did solve the problem as all the tasks queued were ran, even those after the 50th that raise the exception.

I have a feeling that the the exception raised in the handler(message) call does not play nice with the database_sync_to_async decorator.

I tried to subclass the SyncConsumer, surcharge the dispatch method (without the decorator) and call super().dispatch(message) in a try: ... except: ... but then when testing I had the same missing logs after the 50th.

gaelworkstaff avatar Apr 25 '25 12:04 gaelworkstaff

Sorry I'm still confused, how does SafeSyncConsumer fix this? It appears to just catch an exception and log it. What am I missing?

bigfootjon avatar Apr 29 '25 01:04 bigfootjon