taskiq
taskiq copied to clipboard
function maybe_awaitable: coroutine object IncomingMessage. worker cannot recieve the massage from broker
the worker reises an error. I guess the root cause came when the connection with RabbitMQ terminated or was lost.
No exception was found before or after this error.
Unfortunately, I don't know how to reproduce it.
My configuration:
result_expire_time_seconds = int(one_day_seconds)
retry = SimpleRetryMiddleware(default_retry_count=3)
result_backend = RedisAsyncResultBackend(
redis_url=str(settings.redis.redis_url.with_path("/1")),
result_ex_time=result_expire_time_seconds,
serializer=JSONSerializer(),
)
broker = (
AioPikaBroker(
url=str(settings.rabbit.rabbit_url),
)
.with_result_backend(result_backend)
.with_middlewares(retry)
)
Exception:
2024-11-04 12:25:26.767 | ERROR | trace_id=0 | span_id=0 | logging:callHandlers:1706 - Task exception was never retrieved
future: <Task finished name='Task-427' coro=<Receiver.callback() done, defined at /usr/local/lib/python3.11/site-packages/taskiq/receiver/receiver.py:87> exception=ChannelInvalidStateError()>
Traceback (most recent call last):
> File "/usr/local/lib/python3.11/site-packages/taskiq/receiver/receiver.py", line 184, in callback
await maybe_awaitable(message.ack())
│ │ └ <bound method IncomingMessage.ack of IncomingMessage:{'app_id': None,
│ │ 'body_size': 274,
│ │ 'cluster_id': '',
│ │ 'consumer_tag': ...
│ └ AckableMessage(data=b'{"task_id": "15fecea81c6048bb80764c2ebddf14e0", "task_name": "task_drive_fetch_files", "labels": {"asan...
└ <function maybe_awaitable at 0x7f738b1df240>
File "/usr/local/lib/python3.11/site-packages/taskiq/utils.py", line 23, in maybe_awaitable
return await possible_coroutine
└ <coroutine object IncomingMessage.ack at 0x7f7313fcd460>
File "/usr/local/lib/python3.11/site-packages/aio_pika/message.py", line 453, in ack
await self.channel.basic_ack(
│ └ <property object at 0x7f7388f0e160>
└ IncomingMessage:{'app_id': None,
'body_size': 274,
'cluster_id': '',
'consumer_tag': 'ctag1.0580faeeeaf941c7b8e9a7148f4bff...
File "/usr/local/lib/python3.11/site-packages/aio_pika/message.py", line 391, in channel
raise ChannelInvalidStateError
└ <class 'aiormq.exceptions.ChannelInvalidStateError'>
aiormq.exceptions.ChannelInvalidStateError
Sadly, I'm not sure how to reproduce such case either.
But what happens next after this exception is raised? Workers hang or restart? In second case I guess it's fine to leave this implementation as is.
But what happens next after this exception is raised? Workers hang or restart? In second case I guess it's fine to leave this implementation as is.
I enabled debug logging and found out that
- the connection was actually lost 4 seconds ago,
- but could reestablish itself.
- reises the error
- continues work normally
but after I found a huge exception that maybe related to the issue. but I think it comes from the connection stability the full log here https://pastebin.com/H1u7hFKm
2024-11-05 03:15:53.559 | WARNING | trace_id=0 | span_id=0 | logging:callHandlers:1706 - Cannot parse message: b''. Skipping execution.
Expecting value: line 1 column 1 (char 0)
Traceback (most recent call last):
File "/usr/local/bin/taskiq", line 8, in <module>
sys.exit(main())
│ │ └ <function main at 0x7fd7adc498a0>
│ └ <built-in function exit>
└ <module 'sys' (built-in)>
File "/usr/local/lib/python3.11/site-packages/taskiq/__main__.py", line 73, in main
status = command.exec(sys.argv[1:])
│ │ │ └ ['worker', 'backend.tkq:broker']
│ │ └ <module 'sys' (built-in)>
│ └ <function WorkerCMD.exec at 0x7fd7adb071a0>
└ <taskiq.cli.worker.cmd.WorkerCMD object at 0x7fd7ade253d0>
File "/usr/local/lib/python3.11/site-packages/taskiq/cli/worker/cmd.py", line 27, in exec
........
└ <function model_validate at 0x7fd7add77d80>
File "/usr/local/lib/python3.11/site-packages/taskiq/serializers/json_serializer.py", line 32, in loadb
return loads(value.decode())
│ │ └ <method 'decode' of 'bytes' objects>
│ └ b''
└ <function loads at 0x7fd7af056b60>
File "/usr/local/lib/python3.11/json/__init__.py", line 346, in loads
return _default_decoder.decode(s)
│ │ └ ''
│ └ <function JSONDecoder.decode at 0x7fd7af056480>
└ <json.decoder.JSONDecoder object at 0x7fd7af0a2050>
File "/usr/local/lib/python3.11/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
│ │ │ │ └ ''
│ │ │ └ <built-in method match of re.Pattern object at 0x7fd7aefef1d0>
│ │ └ ''
│ └ <function JSONDecoder.raw_decode at 0x7fd7af056520>
└ <json.decoder.JSONDecoder object at 0x7fd7af0a2050>
File "/usr/local/lib/python3.11/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
│ └ ''
└ <class 'json.decoder.JSONDecodeError'>
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)