channels icon indicating copy to clipboard operation
channels copied to clipboard

How to test sync WebsocketConsumer that queries the database

Open jlariza opened this issue 1 year ago • 5 comments

Good day,

I have a WebsocketConsumer that queries the databased based on a parameter. The code works as expected; however, tests keep failing with this message django.db.utils.OperationalError: the connection is closed.

This is the consumer implementation based on the tutorial

class MyModelConsumer(WebsocketConsumer):
    def connect(self):
        self.object_id = self.scope["url_route"]["kwargs"]["object_id"]
        self.room_group_name = f"object_{self.object_id}"

        # Join room group
        async_to_sync(self.channel_layer.group_add)(self.room_group_name, self.channel_name)
        if MyModel.objects.filter(id=self.object_id).exists():
            self.accept()
            MyModel = MyModel.objects.get(id=self.object_id)
            serializer = MyModelSerializer(MyModel)
            # Send message to room group
            async_to_sync(self.channel_layer.group_send)(
                self.room_group_name, {"type": "MyModel", "MyModel": serializer.data}
            )
        else:
            self.close()

    def disconnect(self, close_code):
        # Leave room group
        async_to_sync(self.channel_layer.group_discard)(self.room_group_name, self.channel_name)

and this is the test I'm writing:

@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
class TestMyModelConsumer(TestCase):
    """
    Test class that holds the test cases for the MyModelConsumer class
    """
    @database_sync_to_async
    def create_instance(self):
        return MyModel.objects.create()

    # based on https://channels.readthedocs.io/en/latest/topics/testing.html#websocketcommunicator
    async def test_my_consumer(self):
        instance = await self.create_instance()
        application = URLRouter([
            path("test/<object_id>/", MyModelConsumer.as_asgi()),
        ])
        communicator = WebsocketCommunicator(application, f"test/{instance.id}/")
        connected, subprotocol = await communicator.connect()
        self.assertTrue(connected)

However, it always raises django.db.utils.OperationalError: the connection is closed in this line MyModel.objects.create(). It tries to access the database but cannot do it because the database connection is closed.

I tried creating the model instance synchronously in setUp method; it works but then the OperationalError is raised in this line if MyModel.objects.filter(id=self.object_id).exists():.

Any ideas of what I'm missing or doing incorrectly?

Thank you,

jlariza avatar Aug 20 '23 18:08 jlariza