Errors with python3.11 with concurrency and high event rates
Checklist
- [X] I have included information about relevant versions
- [X] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
I tried to switch to python3.11, I used faust-streaming==0.10.0 and mode-streaming==0.3.2. The application started but the agent started throwing errors and eventually died.
Expected behavior
The agent should process messages without errors.
Actual behavior
see tracebacks below
Full traceback
[2022-12-02 14:13:47,323] [1] [ERROR] [^--Consumer]: Drain messages raised: TypeError('Passing coroutines is forbidden, use tasks explicitly.')
Traceback (most recent call last):
File "/app/venv/lib/python3.11/site-packages/faust/transport/consumer.py", line 1200, in _drain_messages
await self.wait_first(
File "/app/venv/lib/python3.11/site-packages/mode/services.py", line 769, in wait_first
f.result() # propagate exceptions
^^^^^^^^^^
File "/app/venv/lib/python3.11/site-packages/faust/transport/conductor.py", line 274, in on_message
return await get_callback_for_tp(message.tp)(message)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "faust/transport/_cython/conductor.pyx", line 78, in __call__
File "/usr/local/lib/python3.11/asyncio/tasks.py", line 415, in wait
raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
TypeError: Passing coroutines is forbidden, use tasks explicitly.
[2022-12-02 14:13:47,326] [1] [ERROR] [^---Fetcher]: Crashed reason=TypeError('Passing coroutines is forbidden, use tasks explicitly.')
Traceback (most recent call last):
File "/app/venv/lib/python3.11/site-packages/mode/services.py", line 853, in _execute_task
await task
File "/app/venv/lib/python3.11/site-packages/faust/transport/consumer.py", line 177, in _fetcher
await consumer._drain_messages(self)
File "/app/venv/lib/python3.11/site-packages/faust/transport/consumer.py", line 1200, in _drain_messages
await self.wait_first(
File "/app/venv/lib/python3.11/site-packages/mode/services.py", line 769, in wait_first
f.result() # propagate exceptions
^^^^^^^^^^
File "/app/venv/lib/python3.11/site-packages/faust/transport/conductor.py", line 274, in on_message
return await get_callback_for_tp(message.tp)(message)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "faust/transport/_cython/conductor.pyx", line 78, in __call__
File "/usr/local/lib/python3.11/asyncio/tasks.py", line 415, in wait
raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
TypeError: Passing coroutines is forbidden, use tasks explicitly.
[2022-12-02 14:13:48,369] [1] [WARNING] [^----OneForOneSupervisor: (8@0x7f5854dd16d0)]: Max restarts exceeded: MaxRestartsExceeded()
Traceback (most recent call last):
File "/app/venv/lib/python3.11/site-packages/mode/supervisors.py", line 181, in restart_service
async with self._bucket:
File "/app/venv/lib/python3.11/site-packages/mode/utils/times.py", line 149, in __aenter__
raise self.raises()
mode.exceptions.MaxRestartsExceeded
[2022-12-02 14:13:48,370] [1] [ERROR] [^----OneForOneSupervisor: (8@0x7f5854dd16d0)]: Crashed reason=SystemExit(1)
Traceback (most recent call last):
File "/app/venv/lib/python3.11/site-packages/mode/supervisors.py", line 181, in restart_service
async with self._bucket:
File "/app/venv/lib/python3.11/site-packages/mode/utils/times.py", line 149, in __aenter__
raise self.raises()
mode.exceptions.MaxRestartsExceeded
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/app/venv/lib/python3.11/site-packages/mode/services.py", line 853, in _execute_task
await task
File "/app/venv/lib/python3.11/site-packages/mode/supervisors.py", line 141, in _supervisor
await self.restart_services(to_restart)
File "/app/venv/lib/python3.11/site-packages/mode/supervisors.py", line 165, in restart_services
await self.restart_service(service)
File "/app/venv/lib/python3.11/site-packages/mode/supervisors.py", line 191, in restart_service
raise SystemExit(1)
SystemExit: 1
Versions
- Python version: 3.11
- Faust version: 0.10.0
- Operating system: Debian bullseye
Thanks for opening up this issue, I'll look into it. Migrating to Python 3.11 has presented many challenges with asyncio. If you have any sample code I could run, it would be greatly appreciated.
Currently I have no sample code. One more observation: I have the same application run in a few environments, the issue occurs only in environment with much higher events rate from kafka. The agent also have concurrecy>1 on all environments.
We will need to update the cython code to pass in tasks or futures instead of coroutines. The change is simple
for event, chan in full:
task = self._handle_full(event, chan, delivered)
tasklist.add(task)
await wait(tasklist,return_when=ALL_COMPLETED)
I'm testing the fix and it doesn't correct the error
[2022-12-08 13:48:52,992] [1] [ERROR] [^---Fetcher]: Crashed reason=TypeError('Passing coroutines is forbidden, use tasks explicitly.')
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/mode/services.py", line 853, in _execute_task
await task
File "/usr/local/lib/python3.11/site-packages/faust/transport/consumer.py", line 177, in _fetcher
await consumer._drain_messages(self)
File "/usr/local/lib/python3.11/site-packages/faust/transport/consumer.py", line 1200, in _drain_messages
await self.wait_first(
File "/usr/local/lib/python3.11/site-packages/mode/services.py", line 769, in wait_first
f.result() # propagate exceptions
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/faust/transport/conductor.py", line 274, in on_message
return await get_callback_for_tp(message.tp)(message)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/faust/transport/conductor.py", line 169, in on_message
await asyncio.wait(
File "/usr/local/lib/python3.11/asyncio/tasks.py", line 415, in wait
raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
TypeError: Passing coroutines is forbidden, use tasks explicitly.
[2022-12-08 13:53:53,506] [1] [INFO] [^--Consumer]: Stack for <Task pending name='Task-17210' coro=<Service.stop() running at /usr/local/lib/python3.11/site-packages/mode/services.py:912> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[shield.<locals>._inner_done_callback() at /usr/local/lib/python3.11/asyncio/tasks.py:881]> (most recent call last):
File "/usr/local/lib/python3.11/site-packages/mode/services.py", line 912, in stop
await self.on_stop()
File "/app/src/app.py", line 40, in on_stop
await super().on_stop()
File "/usr/local/lib/python3.11/site-packages/faust/app/base.py", line 1585, in on_stop
await self._stop_consumer()
File "/usr/local/lib/python3.11/site-packages/faust/app/base.py", line 1611, in _stop_consumer
await self._consumer_wait_empty(consumer, self.log)
File "/usr/local/lib/python3.11/site-packages/faust/app/base.py", line 1617, in _consumer_wait_empty
await consumer.wait_empty()
File "/usr/local/lib/python3.11/site-packages/mode/services.py", line 506, in _and_transition
return await fun(self, *args, **kwargs)
File "/usr/local/lib/python3.11/site-packages/faust/transport/consumer.py", line 869, in wait_empty
await T(self.commit_and_end_transactions)()
File "/usr/local/lib/python3.11/site-packages/faust/transport/consumer.py", line 879, in commit_and_end_transactions
await self.commit(start_new_transaction=False)
File "/usr/local/lib/python3.11/site-packages/faust/transport/consumer.py", line 934, in commit
return await self.force_commit(
File "/usr/local/lib/python3.11/site-packages/mode/services.py", line 506, in _and_transition
return await fun(self, *args, **kwargs)
File "/usr/local/lib/python3.11/site-packages/faust/transport/consumer.py", line 969, in force_commit
did_commit = await self._commit_tps(
File "/usr/local/lib/python3.11/site-packages/faust/transport/consumer.py", line 985, in _commit_tps
return await self._commit_offsets(
File "/usr/local/lib/python3.11/site-packages/faust/transport/consumer.py", line 1055, in _commit_offsets
did_commit = await self._commit(committable_offsets)
File "/usr/local/lib/python3.11/site-packages/faust/transport/consumer.py", line 1461, in _commit
return await self._thread.commit(offsets)
File "/usr/local/lib/python3.11/site-packages/faust/transport/drivers/aiokafka.py", line 704, in commit
return await self.call_thread(self._commit, offsets)
File "/usr/local/lib/python3.11/site-packages/mode/threads.py", line 429, in call_thread
result = await promise
Added another change, let me know what you get. Still trying to get a reproducible example on my end.
Interesting, I'm looking forward to the reproducible example 😂 @wbarnha