faust
faust copied to clipboard
Faust agents die after some days
Actual behavior
After running the application for some days, agents die slowly and after a few days, no event consumes from topics. If I do not restart the docker container of the Faust app, all agents stop consumption after a few days.
Full traceback
* m_consumer.m_agent ----->
============================================================
['Stack for <coroutine object movie_updated_agent at 0x7f303428bce0> (most recent call last):\n File "/project/m_consumer/movie_updated_consumer.py", line 22, in movie_updated_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n']
* z_consumer.retry_send_product_agent ----->
============================================================
['Stack for <coroutine object retry_send_product_agent at 0x7f30341506b0> (most recent call last):\n File "/project/z_consumer/retry_consumer.py", line 29, in retry_send_product_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object retry_send_product_agent at 0x7f303428bf00> (most recent call last):\n File "/project/z_consumer/retry_consumer.py", line 29, in retry_send_product_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object retry_send_product_agent at 0x7f3034150af0> (most recent call last):\n File "/project/z_consumer/retry_consumer.py", line 29, in retry_send_product_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object retry_send_product_agent at 0x7f30341508d0> (most recent call last):\n File "/project/z_consumer/retry_consumer.py", line 29, in retry_send_product_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object retry_send_product_agent at 0x7f3034150490> (most recent call last):\n File "/project/z_consumer/retry_consumer.py", line 29, in retry_send_product_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n']
* o_consumer.o_agent ----->
============================================================
['Stack for <coroutine object order_agent at 0x7f303419a750> (most recent call last):\n File "/project/o_consumer/consumer.py", line 23, in order_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f30341b3450> (most recent call last):\n File "/project/o_consumer/consumer.py", line 23, in order_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f3034175d50> (most recent call last):\n File "/project/o_consumer/consumer.py", line 23, in order_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f30341b2150> (most recent call last):\n File "/project/o_consumer/consumer.py", line 23, in order_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f3034199450> (most recent call last):\n File "/project/o_consumer/consumer.py", line 23, in order_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f30341b0e50> (most recent call last):\n File "/project/o_consumer/consumer.py", line 23, in order_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f30341c4850> (most recent call last):\n File "/project/o_consumer/consumer.py", line 23, in order_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f3034198150> (most recent call last):\n File "/project/o_consumer/consumer.py", line 23, in order_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f303419ba50> (most recent call last):\n File "/project/o_consumer/consumer.py", line 23, in order_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f3034174b50> (most recent call last):\n File "/project/o_consumer/consumer.py", line 23, in order_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n']
* z.z_consumer.send_user_agent ----->
============================================================
['Stack for <coroutine object send_user_agent at 0x7f30341f8710> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8dd0> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f9010> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8830> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8ef0> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f95b0> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8950> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f9130> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f96d0> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f84d0> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8a70> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f9250> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f85f0> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f97f0> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8b90> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f9370> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f303427e7b0> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8170> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8cb0> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f9490> (most recent call last):\n File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n async for records in stream.take(faust_config.max_stream_take, 1):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n']
* zb_core.transport_layer.zb_product_sent_consumer.consumer.send_product_agent ----->
============================================================
['Stack for <coroutine object send_product_agent at 0x7f3034108dd0> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108050> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108710> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034109250> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108ef0> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108170> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108830> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034109370> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034109010> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108950> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108290> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n', 'Stack for <coroutine object send_product_agent at 0x7f30341fb770> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108b90> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n', 'Stack for <coroutine object send_product_agent at 0x7f30341083b0> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108a70> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n', 'Stack for <coroutine object send_product_agent at 0x7f30341fbe30> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034109130> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n', 'Stack for <coroutine object send_product_agent at 0x7f30341084d0> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108cb0> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n', 'Stack for <coroutine object send_product_agent at 0x7f30341085f0> (most recent call last):\n File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n await __sleep0()\n File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n yield\n']
Sample code
Sample code of one the consumers:
@app.agent(event_sent_topic, concurrency=20)
async def send_event_agent(stream):
""""""
task_name = asyncio.current_task().get_name()
interface = Interface()
async for records in stream.take(10, 1):
for record in records:
send_event_tasks_queue[record.user_id].append(task_name)
await _wrap_send_event(record=record, interface=interface)
await asyncio.sleep(0) # Skipping current event loop run for giving execution chance to other tasks.
Versions
- Python version 3.11.3
- Faust version 0.10.13
- Operating system docker on CentOS (python:3.11.3-slim-buster )
- Kafka version