taskiq icon indicating copy to clipboard operation
taskiq copied to clipboard

function maybe_awaitable: coroutine object IncomingMessage. worker cannot recieve the massage from broker

Open Tsovak opened this issue 1 year ago • 2 comments
trafficstars

the worker reises an error. I guess the root cause came when the connection with RabbitMQ terminated or was lost.
No exception was found before or after this error.

Unfortunately, I don't know how to reproduce it.

My configuration:

result_expire_time_seconds = int(one_day_seconds)

retry = SimpleRetryMiddleware(default_retry_count=3)

result_backend = RedisAsyncResultBackend(
    redis_url=str(settings.redis.redis_url.with_path("/1")),
    result_ex_time=result_expire_time_seconds,
    serializer=JSONSerializer(),
)

broker = (
    AioPikaBroker(
        url=str(settings.rabbit.rabbit_url),
    )
    .with_result_backend(result_backend)
    .with_middlewares(retry)
)


Exception:

2024-11-04 12:25:26.767 | ERROR    | trace_id=0 | span_id=0 | logging:callHandlers:1706 - Task exception was never retrieved
future: <Task finished name='Task-427' coro=<Receiver.callback() done, defined at /usr/local/lib/python3.11/site-packages/taskiq/receiver/receiver.py:87> exception=ChannelInvalidStateError()>
Traceback (most recent call last):

> File "/usr/local/lib/python3.11/site-packages/taskiq/receiver/receiver.py", line 184, in callback
    await maybe_awaitable(message.ack())
          │               │       └ <bound method IncomingMessage.ack of IncomingMessage:{'app_id': None,
          │               │          'body_size': 274,
          │               │          'cluster_id': '',
          │               │          'consumer_tag': ...
          │               └ AckableMessage(data=b'{"task_id": "15fecea81c6048bb80764c2ebddf14e0", "task_name": "task_drive_fetch_files", "labels": {"asan...
          └ <function maybe_awaitable at 0x7f738b1df240>
  File "/usr/local/lib/python3.11/site-packages/taskiq/utils.py", line 23, in maybe_awaitable
    return await possible_coroutine
                 └ <coroutine object IncomingMessage.ack at 0x7f7313fcd460>
  File "/usr/local/lib/python3.11/site-packages/aio_pika/message.py", line 453, in ack
    await self.channel.basic_ack(
          │    └ <property object at 0x7f7388f0e160>
          └ IncomingMessage:{'app_id': None,
             'body_size': 274,
             'cluster_id': '',
             'consumer_tag': 'ctag1.0580faeeeaf941c7b8e9a7148f4bff...
  File "/usr/local/lib/python3.11/site-packages/aio_pika/message.py", line 391, in channel
    raise ChannelInvalidStateError
          └ <class 'aiormq.exceptions.ChannelInvalidStateError'>

aiormq.exceptions.ChannelInvalidStateError

Tsovak avatar Nov 04 '24 12:11 Tsovak

Sadly, I'm not sure how to reproduce such case either.

But what happens next after this exception is raised? Workers hang or restart? In second case I guess it's fine to leave this implementation as is.

s3rius avatar Nov 05 '24 18:11 s3rius

But what happens next after this exception is raised? Workers hang or restart? In second case I guess it's fine to leave this implementation as is.

I enabled debug logging and found out that

  • the connection was actually lost 4 seconds ago,
  • but could reestablish itself.
  • reises the error
  • continues work normally

but after I found a huge exception that maybe related to the issue. but I think it comes from the connection stability the full log here https://pastebin.com/H1u7hFKm

2024-11-05 03:15:53.559 | WARNING  | trace_id=0 | span_id=0 | logging:callHandlers:1706 - Cannot parse message: b''. Skipping execution.
 Expecting value: line 1 column 1 (char 0)
Traceback (most recent call last):

  File "/usr/local/bin/taskiq", line 8, in <module>
    sys.exit(main())
    │   │    └ <function main at 0x7fd7adc498a0>
    │   └ <built-in function exit>
    └ <module 'sys' (built-in)>
  File "/usr/local/lib/python3.11/site-packages/taskiq/__main__.py", line 73, in main
    status = command.exec(sys.argv[1:])
             │       │    │   └ ['worker', 'backend.tkq:broker']
             │       │    └ <module 'sys' (built-in)>
             │       └ <function WorkerCMD.exec at 0x7fd7adb071a0>
             └ <taskiq.cli.worker.cmd.WorkerCMD object at 0x7fd7ade253d0>
  File "/usr/local/lib/python3.11/site-packages/taskiq/cli/worker/cmd.py", line 27, in exec
  
  ........
  
  
  
             └ <function model_validate at 0x7fd7add77d80>
  File "/usr/local/lib/python3.11/site-packages/taskiq/serializers/json_serializer.py", line 32, in loadb
    return loads(value.decode())
           │     │     └ <method 'decode' of 'bytes' objects>
           │     └ b''
           └ <function loads at 0x7fd7af056b60>
  File "/usr/local/lib/python3.11/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
           │                │      └ ''
           │                └ <function JSONDecoder.decode at 0x7fd7af056480>
           └ <json.decoder.JSONDecoder object at 0x7fd7af0a2050>
  File "/usr/local/lib/python3.11/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
               │    │          │      │  └ ''
               │    │          │      └ <built-in method match of re.Pattern object at 0x7fd7aefef1d0>
               │    │          └ ''
               │    └ <function JSONDecoder.raw_decode at 0x7fd7af056520>
               └ <json.decoder.JSONDecoder object at 0x7fd7af0a2050>
  File "/usr/local/lib/python3.11/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
          │                                  └ ''
          └ <class 'json.decoder.JSONDecodeError'>

json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

Tsovak avatar Nov 05 '24 23:11 Tsovak