aio-pika
aio-pika copied to clipboard
QueueIterator.__repr__ causes AttributeError looking for _consumer_tag
When closing an already-cancelled QueueIterator
, the following exception occurs:
[2022-04-12T10:57:40.157Z] self = <[AttributeError("'QueueIterator' object has no attribute '_consumer_tag'") raised in repr()] QueueIterator object at 0x7faf18cc9310>
[2022-04-12T10:57:40.157Z]
[2022-04-12T10:57:40.157Z] def __repr__(self) -> str:
[2022-04-12T10:57:40.157Z] return (
[2022-04-12T10:57:40.157Z] f"<{self.__class__.__name__}: "
[2022-04-12T10:57:40.157Z] f"queue={self._amqp_queue.name!r} "
[2022-04-12T10:57:40.157Z] > f"ctag={self._consumer_tag!r}>"
[2022-04-12T10:57:40.157Z] )
[2022-04-12T10:57:40.157Z] E AttributeError: 'QueueIterator' object has no attribute '_consumer_tag'
This is because QueueIterator.__repr__
unconditially access self._consumer_tag
, and is called from async def close
before (and even when) that attribute may not exist:
@task
async def close(self, *_: Any) -> Any:
log.debug("Cancelling queue iterator %r", self)
if not hasattr(self, "_consumer_tag"):
log.debug("Queue iterator %r already cancelled", self)
return
That first %r
(the failure I'm seeing here) may fail, but the second %r
definitely will fail, because we've established there's no _consumer_tag
attr.
I'm not sure how long this has been broken, as I'm doing an aio-pika upgrade from a much older version, and our codebase had turned the log-level for aio-pika up to WARNING
so the logging.debug
messages were not being evaluated before the upgrade.
For reference, the context is an asyncio.Task of the form
self.connection = await aio_pika.connect_robust(rmq_url)
async with self.connection.channel() as channel:
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue(self.rmq_queue_name, durable=True)
async for message in queue:
yield message
and the cancellation is arriving while we're in the async for
, waiting for a new message, per the below traceback.
C:\Program Files\Python39\lib\asyncio\base_events.py:642: in run_until_complete
return future.result()
tests\integration\conftest.py:347: in webservice_client
yield client
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aiohttp\test_utils.py:423: in __aexit__
await self.close()
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aiohttp\test_utils.py:398: in close
await self._server.close()
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aiohttp\test_utils.py:194: in close
await self.runner.cleanup()
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aiohttp\web_runner.py:294: in cleanup
await self._cleanup_server()
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aiohttp\web_runner.py:381: in _cleanup_server
await self._app.cleanup()
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aiohttp\web_app.py:432: in cleanup
await self.on_cleanup.send(self)
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aiosignal\__init__.py:36: in send
await receiver(*args, **kwargs) # type: ignore
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aiohttp\web_app.py:555: in _on_cleanup
raise errors[0]
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aiohttp\web_app.py:546: in _on_cleanup
await it.__anext__()
src\webservice\app.py:190: in initialize_and_cleanup_status_listener
await status_listener
src\webservice\status_update_api.py:33: in message_dispatcher
async for message in app["rmq"].message_queue():
..\..\lib\shared\src\ps\shared\rabbitmq.py:74: in message_queue
async for message in queue:
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aio_pika\queue.py:483: in __anext__
await asyncio.wait_for(
C:\Program Files\Python39\lib\asyncio\tasks.py:479: in wait_for
return fut.result()
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aio_pika\queue.py:411: in close
log.debug("Queue iterator %r closed", self)
C:\Program Files\Python39\lib\logging\__init__.py:1434: in debug
self._log(DEBUG, msg, args, **kwargs)
C:\Program Files\Python39\lib\logging\__init__.py:1589: in _log
self.handle(record)
C:\Program Files\Python39\lib\logging\__init__.py:1599: in handle
self.callHandlers(record)
C:\Program Files\Python39\lib\logging\__init__.py:1661: in callHandlers
hdlr.handle(record)
C:\Program Files\Python39\lib\logging\__init__.py:952: in handle
self.emit(record)
C:\Program Files\Python39\lib\logging\__init__.py:1091: in emit
self.handleError(record)
C:\Program Files\Python39\lib\logging\__init__.py:1083: in emit
msg = self.format(record)
C:\Program Files\Python39\lib\logging\__init__.py:927: in format
return fmt.format(record)
C:\Program Files\Python39\lib\logging\__init__.py:663: in format
record.message = record.getMessage()
C:\Program Files\Python39\lib\logging\__init__.py:367: in getMessage
msg = msg % self.args