channels
channels copied to clipboard
How to test sync WebsocketConsumer that queries the database
Good day,
I have a WebsocketConsumer
that queries the databased based on a parameter. The code works as expected; however, tests keep failing with this message django.db.utils.OperationalError: the connection is closed
.
This is the consumer implementation based on the tutorial
class MyModelConsumer(WebsocketConsumer):
def connect(self):
self.object_id = self.scope["url_route"]["kwargs"]["object_id"]
self.room_group_name = f"object_{self.object_id}"
# Join room group
async_to_sync(self.channel_layer.group_add)(self.room_group_name, self.channel_name)
if MyModel.objects.filter(id=self.object_id).exists():
self.accept()
MyModel = MyModel.objects.get(id=self.object_id)
serializer = MyModelSerializer(MyModel)
# Send message to room group
async_to_sync(self.channel_layer.group_send)(
self.room_group_name, {"type": "MyModel", "MyModel": serializer.data}
)
else:
self.close()
def disconnect(self, close_code):
# Leave room group
async_to_sync(self.channel_layer.group_discard)(self.room_group_name, self.channel_name)
and this is the test I'm writing:
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
class TestMyModelConsumer(TestCase):
"""
Test class that holds the test cases for the MyModelConsumer class
"""
@database_sync_to_async
def create_instance(self):
return MyModel.objects.create()
# based on https://channels.readthedocs.io/en/latest/topics/testing.html#websocketcommunicator
async def test_my_consumer(self):
instance = await self.create_instance()
application = URLRouter([
path("test/<object_id>/", MyModelConsumer.as_asgi()),
])
communicator = WebsocketCommunicator(application, f"test/{instance.id}/")
connected, subprotocol = await communicator.connect()
self.assertTrue(connected)
However, it always raises django.db.utils.OperationalError: the connection is closed
in this line MyModel.objects.create()
. It tries to access the database but cannot do it because the database connection is closed.
I tried creating the model instance synchronously in setUp
method; it works but then the OperationalError
is raised in this line if MyModel.objects.filter(id=self.object_id).exists():
.
Any ideas of what I'm missing or doing incorrectly?
Thank you,