faust icon indicating copy to clipboard operation
faust copied to clipboard

Errors with python3.11 with concurrency and high event rates

Open krzysieksulejczak opened this issue 3 years ago • 6 comments

Checklist

  • [X] I have included information about relevant versions
  • [X] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

I tried to switch to python3.11, I used faust-streaming==0.10.0 and mode-streaming==0.3.2. The application started but the agent started throwing errors and eventually died.

Expected behavior

The agent should process messages without errors.

Actual behavior

see tracebacks below

Full traceback

[2022-12-02 14:13:47,323] [1] [ERROR] [^--Consumer]: Drain messages raised: TypeError('Passing coroutines is forbidden, use tasks explicitly.')
Traceback (most recent call last):
  File "/app/venv/lib/python3.11/site-packages/faust/transport/consumer.py", line 1200, in _drain_messages
    await self.wait_first(
  File "/app/venv/lib/python3.11/site-packages/mode/services.py", line 769, in wait_first
    f.result()  # propagate exceptions
    ^^^^^^^^^^
  File "/app/venv/lib/python3.11/site-packages/faust/transport/conductor.py", line 274, in on_message
    return await get_callback_for_tp(message.tp)(message)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "faust/transport/_cython/conductor.pyx", line 78, in __call__
  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 415, in wait
    raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
TypeError: Passing coroutines is forbidden, use tasks explicitly.
[2022-12-02 14:13:47,326] [1] [ERROR] [^---Fetcher]: Crashed reason=TypeError('Passing coroutines is forbidden, use tasks explicitly.')
Traceback (most recent call last):
  File "/app/venv/lib/python3.11/site-packages/mode/services.py", line 853, in _execute_task
    await task
  File "/app/venv/lib/python3.11/site-packages/faust/transport/consumer.py", line 177, in _fetcher
    await consumer._drain_messages(self)
  File "/app/venv/lib/python3.11/site-packages/faust/transport/consumer.py", line 1200, in _drain_messages
    await self.wait_first(
  File "/app/venv/lib/python3.11/site-packages/mode/services.py", line 769, in wait_first
    f.result()  # propagate exceptions
    ^^^^^^^^^^
  File "/app/venv/lib/python3.11/site-packages/faust/transport/conductor.py", line 274, in on_message
    return await get_callback_for_tp(message.tp)(message)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "faust/transport/_cython/conductor.pyx", line 78, in __call__
  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 415, in wait
    raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
TypeError: Passing coroutines is forbidden, use tasks explicitly.
[2022-12-02 14:13:48,369] [1] [WARNING] [^----OneForOneSupervisor: (8@0x7f5854dd16d0)]: Max restarts exceeded: MaxRestartsExceeded()
Traceback (most recent call last):
  File "/app/venv/lib/python3.11/site-packages/mode/supervisors.py", line 181, in restart_service
    async with self._bucket:
  File "/app/venv/lib/python3.11/site-packages/mode/utils/times.py", line 149, in __aenter__
    raise self.raises()
mode.exceptions.MaxRestartsExceeded
[2022-12-02 14:13:48,370] [1] [ERROR] [^----OneForOneSupervisor: (8@0x7f5854dd16d0)]: Crashed reason=SystemExit(1)
Traceback (most recent call last):
  File "/app/venv/lib/python3.11/site-packages/mode/supervisors.py", line 181, in restart_service
    async with self._bucket:
  File "/app/venv/lib/python3.11/site-packages/mode/utils/times.py", line 149, in __aenter__
    raise self.raises()
mode.exceptions.MaxRestartsExceeded

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/app/venv/lib/python3.11/site-packages/mode/services.py", line 853, in _execute_task
    await task
  File "/app/venv/lib/python3.11/site-packages/mode/supervisors.py", line 141, in _supervisor
    await self.restart_services(to_restart)
  File "/app/venv/lib/python3.11/site-packages/mode/supervisors.py", line 165, in restart_services
    await self.restart_service(service)
  File "/app/venv/lib/python3.11/site-packages/mode/supervisors.py", line 191, in restart_service
    raise SystemExit(1)
SystemExit: 1

Versions

  • Python version: 3.11
  • Faust version: 0.10.0
  • Operating system: Debian bullseye

krzysieksulejczak avatar Dec 02 '22 22:12 krzysieksulejczak

Thanks for opening up this issue, I'll look into it. Migrating to Python 3.11 has presented many challenges with asyncio. If you have any sample code I could run, it would be greatly appreciated.

wbarnha avatar Dec 02 '22 22:12 wbarnha

Currently I have no sample code. One more observation: I have the same application run in a few environments, the issue occurs only in environment with much higher events rate from kafka. The agent also have concurrecy>1 on all environments.

krzysieksulejczak avatar Dec 02 '22 23:12 krzysieksulejczak

We will need to update the cython code to pass in tasks or futures instead of coroutines. The change is simple

for event, chan in full:
 task = self._handle_full(event, chan, delivered)
 tasklist.add(task)
await wait(tasklist,return_when=ALL_COMPLETED)  

patkivikram avatar Dec 06 '22 18:12 patkivikram

I'm testing the fix and it doesn't correct the error

[2022-12-08 13:48:52,992] [1] [ERROR] [^---Fetcher]: Crashed reason=TypeError('Passing coroutines is forbidden, use tasks explicitly.') 
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/mode/services.py", line 853, in _execute_task
    await task
  File "/usr/local/lib/python3.11/site-packages/faust/transport/consumer.py", line 177, in _fetcher
    await consumer._drain_messages(self)
  File "/usr/local/lib/python3.11/site-packages/faust/transport/consumer.py", line 1200, in _drain_messages
    await self.wait_first(
  File "/usr/local/lib/python3.11/site-packages/mode/services.py", line 769, in wait_first
    f.result()  # propagate exceptions
    ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/faust/transport/conductor.py", line 274, in on_message
    return await get_callback_for_tp(message.tp)(message)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/faust/transport/conductor.py", line 169, in on_message
    await asyncio.wait(
  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 415, in wait
    raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
TypeError: Passing coroutines is forbidden, use tasks explicitly.

[2022-12-08 13:53:53,506] [1] [INFO] [^--Consumer]: Stack for <Task pending name='Task-17210' coro=<Service.stop() running at /usr/local/lib/python3.11/site-packages/mode/services.py:912> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[shield.<locals>._inner_done_callback() at /usr/local/lib/python3.11/asyncio/tasks.py:881]> (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/mode/services.py", line 912, in stop
    await self.on_stop()
  File "/app/src/app.py", line 40, in on_stop
    await super().on_stop()
  File "/usr/local/lib/python3.11/site-packages/faust/app/base.py", line 1585, in on_stop
    await self._stop_consumer()
  File "/usr/local/lib/python3.11/site-packages/faust/app/base.py", line 1611, in _stop_consumer
    await self._consumer_wait_empty(consumer, self.log)
  File "/usr/local/lib/python3.11/site-packages/faust/app/base.py", line 1617, in _consumer_wait_empty
    await consumer.wait_empty()
  File "/usr/local/lib/python3.11/site-packages/mode/services.py", line 506, in _and_transition
    return await fun(self, *args, **kwargs)
  File "/usr/local/lib/python3.11/site-packages/faust/transport/consumer.py", line 869, in wait_empty
    await T(self.commit_and_end_transactions)()
  File "/usr/local/lib/python3.11/site-packages/faust/transport/consumer.py", line 879, in commit_and_end_transactions
    await self.commit(start_new_transaction=False)
  File "/usr/local/lib/python3.11/site-packages/faust/transport/consumer.py", line 934, in commit
    return await self.force_commit(
  File "/usr/local/lib/python3.11/site-packages/mode/services.py", line 506, in _and_transition
    return await fun(self, *args, **kwargs)
  File "/usr/local/lib/python3.11/site-packages/faust/transport/consumer.py", line 969, in force_commit
    did_commit = await self._commit_tps(
  File "/usr/local/lib/python3.11/site-packages/faust/transport/consumer.py", line 985, in _commit_tps
    return await self._commit_offsets(
  File "/usr/local/lib/python3.11/site-packages/faust/transport/consumer.py", line 1055, in _commit_offsets
    did_commit = await self._commit(committable_offsets)
  File "/usr/local/lib/python3.11/site-packages/faust/transport/consumer.py", line 1461, in _commit
    return await self._thread.commit(offsets)
  File "/usr/local/lib/python3.11/site-packages/faust/transport/drivers/aiokafka.py", line 704, in commit
    return await self.call_thread(self._commit, offsets)
  File "/usr/local/lib/python3.11/site-packages/mode/threads.py", line 429, in call_thread
    result = await promise
 

PushUpek avatar Dec 08 '22 13:12 PushUpek

Added another change, let me know what you get. Still trying to get a reproducible example on my end.

wbarnha avatar Dec 13 '22 23:12 wbarnha

Interesting, I'm looking forward to the reproducible example 😂 @wbarnha

lqhuang avatar Dec 16 '22 03:12 lqhuang