mode icon indicating copy to clipboard operation
mode copied to clipboard

Error closing application

Open cmartmos opened this issue 1 year ago • 7 comments

Checklist

  • [X] I have included information about relevant versions
  • [X] I have verified that the issue persists when using the master branch of Mode.

Expected behavior

No error expected

Actual behavior

RuntimeWarning: coroutine 'sleep' was never awaited (mode/services.py:849)

Full traceback

...
[2023-09-27 14:23:25,835] [13671] [INFO] [^Worker]: Gathering service tasks... 
[2023-09-27 14:23:25,836] [13671] [INFO] [^Worker]: Gathering all futures... 
[2023-09-27 14:23:26,848] [13671] [INFO] [^---ProducerBuffer]: Terminating cancelled task: <coroutine object ProducerBuffer._handle_pending at 0x7fa8f5063610> 
[2023-09-27 14:23:26,851] [13671] [INFO] [^--Consumer]: Terminating cancelled task: <coroutine object Consumer._commit_handler at 0x7fa8f50633e0> 
[2023-09-27 14:23:26,852] [13671] [INFO] [^--Monitor]: Terminating cancelled task: <coroutine object Monitor._sampler at 0x7fa8f5062dc0> 
[2023-09-27 14:23:26,852] [13671] [INFO] [^--Consumer]: Terminating cancelled task: <coroutine object Consumer._commit_livelock_detector at 0x7fa8f4f74350> 
[2023-09-27 14:23:27,854] [13671] [INFO] [^Worker]: Closing event loop 
[2023-09-27 14:23:27,992] [13671] [INFO] [^---AIOKafkaConsumerThread]: Cancelled task <coroutine object ServiceThread._keepalive2 at 0x7fa8f4f744a0>: Event loop is closed 
/venv/proto/lib/python3.10/site-packages/mode/services.py:849: RuntimeWarning: coroutine 'sleep' was never awaited
  self.log.info("Cancelled task %r: %s", task, exc)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

Versions

  • Python version 3.10
  • Operating system Ubuntu 20.04

cmartmos avatar Sep 27 '23 12:09 cmartmos

Do you have any code for us so we can reproduce this?

wbarnha avatar Sep 27 '23 12:09 wbarnha

The issue arises because of unexpected application crashes and then the event loop raises the given error while closing

cmartmos avatar Sep 27 '23 12:09 cmartmos

Hard to say what happened here. https://github.com/faust-streaming/mode/blob/5260fa15a9482a216b74253d90a3c80ef9dc4e92/mode/services.py#L841-L854 Based on this log statement, I presume you're using Faust:

[2023-09-27 14:23:26,848] [13671] [INFO] [^---ProducerBuffer]: Terminating cancelled task: <coroutine object ProducerBuffer._handle_pending at 0x7fa8f5063610> 

I wonder if this is moreso a Faust bug than a Mode bug.

wbarnha avatar Mar 19 '24 15:03 wbarnha

Oh, wait, it's not. Let me dig into it more.

wbarnha avatar Mar 19 '24 17:03 wbarnha

If this happens in tests, tests hang indefinitely. Here's the output running pytest and also capturing what is printed after a ctrl+c:

===================================== 1 passed, 57 deselected, 1 warning in 5.69s =====================================
/path/to/proj/.venv/lib/python3.9/site-packages/mode/services.py:881: RuntimeWarning: coroutine 'sleep' was never awaited
  await self.crash(exc)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
^CException ignored in: <module 'threading' from '/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/threading.py'>
Traceback (most recent call last):
  File "/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/threading.py", line 1428, in _shutdown
    lock.acquire()
KeyboardInterrupt: 

didimelli avatar Aug 12 '24 14:08 didimelli

@wbarnha Very simple reproduction:

Versions:

❯ uv pip list | grep -E 'faust|mode'
faust-streaming    0.11.2
mode-streaming     0.4.1
❯ python --version
Python 3.9.2
❯ python -c "import sys; print(sys.version)"
3.9.2 (default, Mar 27 2021, 23:26:06) 
[Clang 11.1.0 ]
import faust

app = faust.App(
    "repro",
    broker="localhost:29092",
)


async def main():
    await app.start()
    print("start")
    raise ValueError("asd")


if __name__ == "__main__":
    import anyio

    anyio.run(main)

And that prints

❯ python repro.py
start
Traceback (most recent call last):
  File "/path/to/proj/repro.py", line 22, in <module>
    anyio.run(main)
  File "/path/to/proj/.venv/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 74, in run
    return async_backend.run(func, args, {}, backend_options)
  File "/path/to/proj/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 2034, in run
    return runner.run(wrapper())
  File "/path/to/proj/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 194, in run
    return self._loop.run_until_complete(task)
  File "/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/path/to/proj/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 2022, in wrapper
    return await func(*args)
  File "/path/to/proj/repro.py", line 16, in main
    raise ValueError("asd")
ValueError: asd
/path/to/proj/.venv/lib/python3.9/site-packages/mode/services.py:881: RuntimeWarning: coroutine 'sleep' was never awaited
  await self.crash(exc)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
^CException ignored in: <module 'threading' from '/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/threading.py'>
Traceback (most recent call last):
  File "/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/threading.py", line 1428, in _shutdown
    lock.acquire()
KeyboardInterrupt: 

Until the KeyboardInterrupt, the program hangs indefinitely.

EDIT Exact same happens with asyncio

import faust

app = faust.App(
    "repro",
    broker="localhost:29092",
)


async def main():
    await app.start()
    print("start")
    raise ValueError("asd")


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

which prints

❯ python repro.py
start
Traceback (most recent call last):
  File "/home/didi/do/aurora-asms-adapter-ionscv/repro.py", line 18, in <module>
    asyncio.run(main())
  File "/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/home/didi/do/aurora-asms-adapter-ionscv/repro.py", line 12, in main
    raise ValueError("asd")
ValueError: asd
/home/didi/do/aurora-asms-adapter-ionscv/.venv/lib/python3.9/site-packages/mode/services.py:881: RuntimeWarning: coroutine 'sleep' was never awaited
  await self.crash(exc)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
^CException ignored in: <module 'threading' from '/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/threading.py'>
Traceback (most recent call last):
  File "/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/threading.py", line 1428, in _shutdown
    lock.acquire()
KeyboardInterrupt: 

didimelli avatar Aug 13 '24 13:08 didimelli

Debug logging
❯ python repro.py                            
DEBUG:asyncio:Using selector: EpollSelector
DEBUG:asyncio:Using selector: EpollSelector
INFO:faust.app.base:[^App]: Starting...
INFO:faust.sensors.monitor:[^-Monitor]: Starting...
DEBUG:faust.sensors.monitor:[^-Monitor]: Started.
INFO:faust.transport.drivers.aiokafka:[^-Producer]: Starting...
INFO:faust.transport.producer:[^--ProducerBuffer]: Starting...
DEBUG:faust.transport.producer:[^--ProducerBuffer]: Started.
DEBUG:aiokafka.producer.producer:Starting the Kafka producer
DEBUG:aiokafka:Attempting to bootstrap via node at localhost:29092
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 1: MetadataRequest_v0(topics=[])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 1: MetadataResponse_v0(brokers=[(node_id=1001, host='localhost', port=29092)], topics=[...])
DEBUG:aiokafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 17, groups: 0)
DEBUG:aiokafka.conn:Closing connection at localhost:29092
DEBUG:aiokafka:Received cluster metadata: ClusterMetadata(brokers: 1, topics: 17, groups: 0)
DEBUG:aiokafka:Initiating connection to node 1001 at localhost:29092
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 1: ApiVersionRequest_v0()
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[...])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 2: MetadataRequest_v0(topics=[])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 2: MetadataResponse_v0(brokers=[(node_id=1001, host='localhost', port=29092)], topics=[...])
DEBUG:aiokafka.conn:Closing connection at localhost:29092
DEBUG:aiokafka.producer.producer:Kafka producer started
DEBUG:faust.transport.drivers.aiokafka:[^-Producer]: Started.
INFO:faust.web.cache.backends.base:[^-CacheBackend]: Starting...
DEBUG:faust.web.cache.backends.base:[^-CacheBackend]: Started.
INFO:faust.web.drivers.aiohttp:[^-Web]: Starting...
INFO:faust.web.drivers.aiohttp:[^--Server]: Starting...
DEBUG:faust.web.drivers.aiohttp:[^--Server]: Started.
DEBUG:faust.web.drivers.aiohttp:[^-Web]: Started.
INFO:faust.transport.drivers.aiokafka:[^-Consumer]: Starting...
DEBUG:mode.threads:[^--MethodQueue@0x7f8108bd6520]: Starting...
DEBUG:mode.threads:[^---MethodQueueWorker@0x7f8108318880 index=0]: Starting...
DEBUG:mode.threads:[^---MethodQueueWorker@0x7f8108318880 index=0]: Started.
DEBUG:mode.threads:[^---MethodQueueWorker@0x7f8108318fa0 index=1]: Starting...
DEBUG:mode.threads:[^---MethodQueueWorker@0x7f8108318fa0 index=1]: Started.
DEBUG:mode.threads:[^--MethodQueue@0x7f8108bd6520]: Started.
INFO:faust.transport.drivers.aiokafka:[^--AIOKafkaConsumerThread]: Starting...
DEBUG:aiokafka:Attempting to bootstrap via node at localhost:29092
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 1: MetadataRequest_v0(topics=[])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 1: MetadataResponse_v0(brokers=[(node_id=1001, host='localhost', port=29092)], topics=[...])
DEBUG:aiokafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 17, groups: 0)
DEBUG:aiokafka.conn:Closing connection at localhost:29092
DEBUG:aiokafka:Received cluster metadata: ClusterMetadata(brokers: 1, topics: 17, groups: 0)
DEBUG:aiokafka:Initiating connection to node 1001 at localhost:29092
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 1: ApiVersionRequest_v0()
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[...])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 2: MetadataRequest_v0(topics=[])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 2: MetadataResponse_v0(brokers=[(node_id=1001, host='localhost', port=29092)], topics=[...])
DEBUG:aiokafka.conn:Closing connection at localhost:29092
DEBUG:faust.transport.drivers.aiokafka:[^--AIOKafkaConsumerThread]: Started.
DEBUG:mode.threads:[^---MethodQueue@0x7f8108318b80]: Starting...
DEBUG:mode.threads:[^----MethodQueueWorker@0x7f810833caf0 index=0]: Starting...
DEBUG:mode.threads:[^----MethodQueueWorker@0x7f810833caf0 index=0]: Started.
DEBUG:mode.threads:[^----MethodQueueWorker@0x7f810833cb80 index=1]: Starting...
DEBUG:mode.threads:[^----MethodQueueWorker@0x7f810833cb80 index=1]: Started.
DEBUG:mode.threads:[^---MethodQueue@0x7f8108318b80]: Started.
DEBUG:faust.transport.drivers.aiokafka:[^-Consumer]: Started.
INFO:faust.assignor.leader_assignor:[^-LeaderAssignor]: Starting...
INFO:faust.transport.drivers.aiokafka:[^-Producer]: Creating topic 'repro-__assignor-__leader'
DEBUG:faust.transport.drivers.aiokafka:[^-Producer]: Topic 'repro-__assignor-__leader' exists, skipping creation.
DEBUG:aiokafka:Initiating connection to node 1001 at localhost:29092
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 1: ApiVersionRequest_v0()
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[...])
DEBUG:aiokafka:Sending metadata request MetadataRequest_v1(topics=NULL) to node 1001
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 2: MetadataRequest_v1(topics=NULL)
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 2: MetadataResponse_v1(brokers=[(node_id=1001, host='localhost', port=29092, rack=None)], controller_id=1001, topics=[...])
DEBUG:aiokafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 17, groups: 0)
DEBUG:faust.assignor.leader_assignor:[^-LeaderAssignor]: Started.
INFO:faust.agents.replies:[^-ReplyConsumer]: Starting...
DEBUG:faust.agents.replies:[^-ReplyConsumer]: Started.
INFO:faust.agents.manager:[^-AgentManager]: Starting...
DEBUG:faust.agents.manager:[^-AgentManager]: Started.
INFO:faust.transport.conductor:[^--Conductor]: Starting...
DEBUG:faust.transport.conductor:[^--Conductor]: Started.
INFO:faust.tables.manager:[^-TableManager]: Starting...
INFO:faust.transport.conductor:[^--Conductor]: Waiting for agents to start...
INFO:faust.transport.conductor:[^--Conductor]: Waiting for tables to be registered...
DEBUG:mode.timers:Timer Monitor.sampler woke up - iteration=0 time_spent_sleeping=1.0019908730173483 drift=-0.001990873017348349 new_interval=0.9980091269826517 since_epoch=1.0020327200181782
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=0 time_spent_sleeping=1.0016229959437624 drift=-0.001622995943762362 new_interval=0.9983770040562376 since_epoch=1.0016567530110478
INFO:faust.tables.recovery:[^--Recovery]: Starting...
DEBUG:faust.tables.recovery:[^--Recovery]: Started.
DEBUG:faust.tables.manager:[^-TableManager]: Started.
DEBUG:faust.app.base:[^App]: Started.
INFO:faust.transport.drivers.aiokafka:[^-Producer]: Creating topic 'repro-__assignor-__leader'
DEBUG:faust.transport.drivers.aiokafka:[^-Producer]: Topic 'repro-__assignor-__leader' exists, skipping creation.
DEBUG:aiokafka:Sending metadata request MetadataRequest_v1(topics=NULL) to node 1001
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 3: MetadataRequest_v1(topics=NULL)
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 3: MetadataResponse_v1(brokers=[(node_id=1001, host='localhost', port=29092, rack=None)], controller_id=1001, topics=[...])
DEBUG:aiokafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 17, groups: 0)
INFO:aiokafka.consumer.subscription_state:Updating subscribed topics to: frozenset({'repro-__assignor-__leader'})
INFO:aiokafka.consumer.consumer:Subscribed to topic(s): {'repro-__assignor-__leader'}
DEBUG:aiokafka:Sending FindCoordinator request for key repro to broker 1001
DEBUG:aiokafka:Initiating connection to node 1001 at localhost:29092
DEBUG:aiokafka:Initiating connection to node 1001 at localhost:29092
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 1: ApiVersionRequest_v0()
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[...])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 2: FindCoordinatorRequest_v1(coordinator_key='repro', coordinator_type=0)
DEBUG:aiokafka:Sending metadata request MetadataRequest_v1(topics=['repro-__assignor-__leader']) to node 1001
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 3: MetadataRequest_v1(topics=['repro-__assignor-__leader'])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 2: FindCoordinatorResponse_v1(throttle_time_ms=0, error_code=0, error_message='NONE', coordinator_id=1001, host='localhost', port=29092)
DEBUG:aiokafka:Received group coordinator response FindCoordinatorResponse_v1(throttle_time_ms=0, error_code=0, error_message='NONE', coordinator_id=1001, host='localhost', port=29092)
DEBUG:aiokafka:Initiating connection to node 1001 at localhost:29092
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 3: MetadataResponse_v1(brokers=[(node_id=1001, host='localhost', port=29092, rack=None)], controller_id=1001, topics=[(error_code=0, topic='repro-__assignor-__leader', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])])])
DEBUG:aiokafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 1: ApiVersionRequest_v0()
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[...])
INFO:aiokafka.consumer.group_coordinator:Discovered coordinator 1001 for group repro
INFO:aiokafka.consumer.group_coordinator:Revoking previously assigned partitions set() for group repro
INFO:aiokafka.consumer.group_coordinator:(Re-)joining group repro
DEBUG:aiokafka.consumer.group_coordinator:Sending JoinGroup (JoinGroupRequest_v5(group='repro', session_timeout=60000, rebalance_timeout=60000, member_id='', group_instance_id=None, protocol_type='consumer', group_protocols=[(protocol_name='faust', protocol_metadata=b'\x00\x04\x00\x00\x00\x01\x00\x19repro-__assignor-__leader\x00\x00\x00\x9a{"assignment":{"actives":{},"standbys":{}},"url":"http://ditl75...')])) to coordinator 1001
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 2: JoinGroupRequest_v5(group='repro', session_timeout=60000, rebalance_timeout=60000, member_id='', group_instance_id=None, protocol_type='consumer', group_protocols=[(protocol_name='faust', protocol_metadata=b'\x00\x04\x00\x00\x00\x01\x00\x19repro-__assignor-__leader\x00\x00\x00\x9a{"assignment":{"actives":{},"standbys":{}},"url":"http://ditl75...')])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 2: JoinGroupResponse_v5(throttle_time_ms=0, error_code=79, generation_id=-1, group_protocol='', leader_id='', member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', members=[])
DEBUG:aiokafka.consumer.group_coordinator:Sending JoinGroup (JoinGroupRequest_v5(group='repro', session_timeout=60000, rebalance_timeout=60000, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', group_instance_id=None, protocol_type='consumer', group_protocols=[(protocol_name='faust', protocol_metadata=b'\x00\x04\x00\x00\x00\x01\x00\x19repro-__assignor-__leader\x00\x00\x00\x9a{"assignment":{"actives":{},"standbys":{}},"url":"http://ditl75...')])) to coordinator 1001
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 3: JoinGroupRequest_v5(group='repro', session_timeout=60000, rebalance_timeout=60000, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', group_instance_id=None, protocol_type='consumer', group_protocols=[(protocol_name='faust', protocol_metadata=b'\x00\x04\x00\x00\x00\x01\x00\x19repro-__assignor-__leader\x00\x00\x00\x9a{"assignment":{"actives":{},"standbys":{}},"url":"http://ditl75...')])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 3: JoinGroupResponse_v5(throttle_time_ms=0, error_code=0, generation_id=1, group_protocol='faust', leader_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', members=[(member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', group_instance_id=None, member_metadata=b'\x00\x04\x00\x00\x00\x01\x00\x19repro-__assignor-__leader\x00\x00\x00\x9a{"assignment":{"actives":{},"standbys":{}},"url":"http://ditl75...')])
DEBUG:aiokafka.consumer.group_coordinator:Join group response JoinGroupResponse_v5(throttle_time_ms=0, error_code=0, generation_id=1, group_protocol='faust', leader_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', members=[(member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', group_instance_id=None, member_metadata=b'\x00\x04\x00\x00\x00\x01\x00\x19repro-__assignor-__leader\x00\x00\x00\x9a{"assignment":{"actives":{},"standbys":{}},"url":"http://ditl75...')])
INFO:aiokafka.consumer.group_coordinator:Joined group 'repro' (generation 1) with member_id faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f
INFO:aiokafka.consumer.group_coordinator:Elected group leader -- performing partition assignments using faust
DEBUG:aiokafka.consumer.group_coordinator:Performing assignment for group repro using strategy faust with subscriptions {'faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f': ConsumerProtocolMemberMetadata(version=4, subscription=['repro-__assignor-__leader'], user_data=b'{"assignment":{"actives":{},"standbys":{}},"url":"http://ditl75p9gk3:6066","changelog_distribution":...')}
DEBUG:aiokafka.consumer.group_coordinator:Finished assignment for group repro: {'faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f': ConsumerProtocolMemberAssignment(version=4, assignment=[(topic='repro-__assignor-__leader', partitions=[0])], user_data=b"x\x9c\x95\x8fA\n\xc20\x10E\xef2\xebJ\x0bb\xc5\\EJH\x9b!\x1d\x8cI\x98LE)\xb9\xbb)]Kq\xf7\x17\xef?x+\x98\x9c\xc9\x85'\x06\x01\xb5\x82\x99\x84^\x98\xb7\xc9\x988\x9e\xb4\xde\x81\xc8uz4\x16\x19\xd4\xbd\x1bJ\x03YL\xb0\xe3\xe7\x10\x1eJ\x85\x17\xf6\xa0`\x16I\xaam-\x89\xbf^\xd2\xcd=\xce\xaa...")}
DEBUG:aiokafka.consumer.group_coordinator:Sending leader SyncGroup for group repro to coordinator 1001: SyncGroupRequest_v3(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', group_instance_id=None, group_assignment=[(member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', member_metadata=b"\x00\x04\x00\x00\x00\x01\x00\x19repro-__assignor-__leader\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x98x\x9c\x95\x8fA\n\xc20\x10E\xef2\xebJ\x0bb\xc5\\EJH\x9b!\x1d\x8cI\x98LE)\xb9\xbb)]Kq\xf7\x17\xef?x+\x98\x9c\xc9\x85'\x06\x01\xb5\x82\x99\x84^\x98...")])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 4: SyncGroupRequest_v3(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', group_instance_id=None, group_assignment=[(member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', member_metadata=b"\x00\x04\x00\x00\x00\x01\x00\x19repro-__assignor-__leader\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x98x\x9c\x95\x8fA\n\xc20\x10E\xef2\xebJ\x0bb\xc5\\EJH\x9b!\x1d\x8cI\x98LE)\xb9\xbb)]Kq\xf7\x17\xef?x+\x98\x9c\xc9\x85'\x06\x01\xb5\x82\x99\x84^\x98...")])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 4: SyncGroupResponse_v3(throttle_time_ms=0, error_code=0, member_assignment=b"\x00\x04\x00\x00\x00\x01\x00\x19repro-__assignor-__leader\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x98x\x9c\x95\x8fA\n\xc20\x10E\xef2\xebJ\x0bb\xc5\\EJH\x9b!\x1d\x8cI\x98LE)\xb9\xbb)]Kq\xf7\x17\xef?x+\x98\x9c\xc9\x85'\x06\x01\xb5\x82\x99\x84^\x98...")
INFO:aiokafka.consumer.group_coordinator:Successfully synced group repro with generation 1
INFO:aiokafka.consumer.group_coordinator:Setting newly assigned partitions {TopicPartition(topic='repro-__assignor-__leader', partition=0)} for group repro
DEBUG:aiokafka.consumer.fetcher:Updating fetch positions for partitions [TopicPartition(topic='repro-__assignor-__leader', partition=0)]
DEBUG:aiokafka.consumer.group_coordinator:Fetching committed offsets for partitions: [TopicPartition(topic='repro-__assignor-__leader', partition=0)]
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 5: OffsetFetchRequest_v1(consumer_group='repro', topics=[(topic='repro-__assignor-__leader', partitions=[0])])
INFO:faust.app.base:Executing _on_partitions_assigned
INFO:faust.tables.recovery:generation id 1 app consumers id 1
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 5: OffsetFetchResponse_v1(topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=-1, metadata='', error_code=0)])])
INFO:faust.tables.recovery:[^--Recovery]: Seek stream partitions to committed offsets.
DEBUG:aiokafka.consumer.group_coordinator:No committed offset for partition TopicPartition(topic='repro-__assignor-__leader', partition=0)
DEBUG:aiokafka.consumer.fetcher:No committed offset found for TopicPartition(topic='repro-__assignor-__leader', partition=0)
DEBUG:aiokafka.consumer.fetcher:Resetting offset for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) using earliest strategy.
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 4: OffsetRequest_v1(replica_id=-1, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, timestamp=-2)])])
DEBUG:aiokafka.consumer.group_coordinator:Fetching committed offsets for partitions: [TopicPartition(topic='repro-__assignor-__leader', partition=0)]
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 6: OffsetFetchRequest_v1(consumer_group='repro', topics=[(topic='repro-__assignor-__leader', partitions=[0])])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 4: OffsetResponse_v1(topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, timestamp=-1, offset=0)])])
DEBUG:aiokafka.consumer.fetcher:Handling ListOffsetResponse response for TopicPartition(topic='repro-__assignor-__leader', partition=0). Fetched offset 0, timestamp -1
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 6: OffsetFetchResponse_v1(topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=-1, metadata='', error_code=0)])])
DEBUG:aiokafka.consumer.group_coordinator:No committed offset for partition TopicPartition(topic='repro-__assignor-__leader', partition=0)
DEBUG:aiokafka.consumer.consumer:Seeking to committed of partition TopicPartition(topic='repro-__assignor-__leader', partition=0) None
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 5: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
INFO:faust.tables.recovery:[^--Recovery]: Resuming flow...
INFO:faust.transport.consumer:[^--Fetcher]: Starting...
DEBUG:faust.transport.consumer:[^--Fetcher]: Started.
INFO:faust.tables.recovery:[^--Recovery]: Worker ready
start
INFO:faust.tables.recovery:[^--Recovery]: Terminating cancelled task: <coroutine object Recovery._restart_recovery at 0x7f81082dae40>
INFO:faust.transport.drivers.aiokafka:[^-Consumer]: Terminating cancelled task: <coroutine object Consumer._commit_handler at 0x7f8108314440>
INFO:faust.transport.producer:[^--ProducerBuffer]: Terminating cancelled task: <coroutine object ProducerBuffer._handle_pending at 0x7f8108bf60c0>
INFO:faust.transport.drivers.aiokafka:[^-Consumer]: Terminating cancelled task: <coroutine object Consumer._commit_livelock_detector at 0x7f8108bbfcc0>
DEBUG:mode.threads:[^---MethodQueueWorker@0x7f8108318880 index=0]: Terminating cancelled task: <coroutine object MethodQueueWorker._method_queue_do_work at 0x7f8108314b40>
DEBUG:mode.threads:[^---MethodQueueWorker@0x7f8108318fa0 index=1]: Terminating cancelled task: <coroutine object MethodQueueWorker._method_queue_do_work at 0x7f8108314cc0>
INFO:faust.transport.conductor:[^--Conductor]: Terminating cancelled task: <coroutine object Conductor._subscriber at 0x7f8108314540>
INFO:faust.sensors.monitor:[^-Monitor]: Terminating cancelled task: <coroutine object Monitor._sampler at 0x7f8108bbfbc0>
INFO:faust.tables.recovery:[^--Recovery]: Terminating cancelled task: <coroutine object Recovery._slurp_changelogs at 0x7f81082daf40>
INFO:faust.tables.recovery:[^--Recovery]: Terminating cancelled task: <coroutine object Recovery._publish_stats at 0x7f81082dad40>
Traceback (most recent call last):
  File "/home/didi/do/aurora-asms-adapter-ionscv/repro.py", line 22, in <module>
    asyncio.run(main())
  File "/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/home/didi/do/aurora-asms-adapter-ionscv/repro.py", line 16, in main
    raise ValueError("asd")
ValueError: asd
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=1 time_spent_sleeping=1.0000067059881985 drift=-6.705988198518753e-06 new_interval=0.9999932940118015 since_epoch=2.001684717950411
INFO:faust.transport.drivers.aiokafka:[^--AIOKafkaConsumerThread]: Cancelled task <coroutine object ServiceThread._keepalive2 at 0x7f8108314640>: Event loop is closed
/home/didi/do/aurora-asms-adapter-ionscv/.venv/lib/python3.9/site-packages/mode/services.py:881: RuntimeWarning: coroutine 'sleep' was never awaited
  await self.crash(exc)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 5: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 6: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=2 time_spent_sleeping=1.0014912839978933 drift=-0.0014912839978933334 new_interval=0.9985087160021067 since_epoch=3.0031919409520924
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=3 time_spent_sleeping=0.9997022351017222 drift=0.00029776489827781916 new_interval=1.0002977648982778 since_epoch=4.002909027971327
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: repro[1] faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 7: HeartbeatRequest_v1(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 7: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
DEBUG:aiokafka.consumer.group_coordinator:Received successful heartbeat response for group repro
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 6: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 7: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=4 time_spent_sleeping=1.0018507899949327 drift=-0.0018507899949327111 new_interval=0.9981492100050673 since_epoch=5.0047739499714226
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 7: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 8: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=5 time_spent_sleeping=0.9995446259854361 drift=0.0004553740145638585 new_interval=1.0004553740145639 since_epoch=6.004337324993685
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=6 time_spent_sleeping=1.001647011958994 drift=-0.0016470119589939713 new_interval=0.998352988041006 since_epoch=7.006001248024404
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: repro[1] faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 8: HeartbeatRequest_v1(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 8: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
DEBUG:aiokafka.consumer.group_coordinator:Received successful heartbeat response for group repro
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 8: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 9: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=7 time_spent_sleeping=0.9997663419926539 drift=0.00023365800734609365 new_interval=1.000233658007346 since_epoch=8.005789019051008
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 9: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 10: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=8 time_spent_sleeping=1.0010316009866074 drift=-0.0010316009866073728 new_interval=0.9989683990133926 since_epoch=9.00683572201524
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=9 time_spent_sleeping=1.000353861018084 drift=-0.0003538610180839896 new_interval=0.999646138981916 since_epoch=10.007212847005576
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: repro[1] faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 9: HeartbeatRequest_v1(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 9: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
DEBUG:aiokafka.consumer.group_coordinator:Received successful heartbeat response for group repro
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 10: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 11: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=10 time_spent_sleeping=1.0006299020024016 drift=-0.00062990200240165 new_interval=0.9993700979975984 since_epoch=11.007861518999562
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 11: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 12: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=11 time_spent_sleeping=1.000315580982715 drift=-0.00031558098271489143 new_interval=0.9996844190172851 since_epoch=12.008191514993086
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=12 time_spent_sleeping=1.0013250630581751 drift=-0.0013250630581751466 new_interval=0.9986749369418249 since_epoch=13.009536664001644
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: repro[1] faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 10: HeartbeatRequest_v1(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 10: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
DEBUG:aiokafka.consumer.group_coordinator:Received successful heartbeat response for group repro
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 12: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 13: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=13 time_spent_sleeping=1.0003391930367798 drift=-0.0003391930367797613 new_interval=0.9996608069632202 since_epoch=14.009901670971885
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 13: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 14: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=14 time_spent_sleeping=1.001379554043524 drift=-0.0013795540435239673 new_interval=0.998620445956476 since_epoch=15.011302259052172
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=15 time_spent_sleeping=0.9995917979395017 drift=0.0004082020604982972 new_interval=1.0004082020604983 since_epoch=16.01091179798823
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: repro[1] faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 11: HeartbeatRequest_v1(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 11: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
DEBUG:aiokafka.consumer.group_coordinator:Received successful heartbeat response for group repro
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 14: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 15: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=16 time_spent_sleeping=1.0019008970120922 drift=-0.001900897012092173 new_interval=0.9980991029879078 since_epoch=17.01283338095527
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 15: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 16: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=17 time_spent_sleeping=0.9996139759896323 drift=0.0003860240103676915 new_interval=1.0003860240103677 since_epoch=18.012464510044083
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=18 time_spent_sleeping=1.0019195060012862 drift=-0.0019195060012862086 new_interval=0.9980804939987138 since_epoch=19.01440254598856
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: repro[1] faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 12: HeartbeatRequest_v1(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 12: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
DEBUG:aiokafka.consumer.group_coordinator:Received successful heartbeat response for group repro
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 16: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 17: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=19 time_spent_sleeping=1.0002802079543471 drift=-0.00028020795434713364 new_interval=0.9997197920456529 since_epoch=20.014701541978866
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 17: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 18: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=20 time_spent_sleeping=1.0008563300361857 drift=-0.0008563300361856818 new_interval=0.9991436699638143 since_epoch=21.01557591301389
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=21 time_spent_sleeping=1.0008519539842382 drift=-0.0008519539842382073 new_interval=0.9991480460157618 since_epoch=22.01644936297089
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: repro[1] faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 13: HeartbeatRequest_v1(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 13: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
DEBUG:aiokafka.consumer.group_coordinator:Received successful heartbeat response for group repro
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 18: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 19: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=22 time_spent_sleeping=1.000216327025555 drift=-0.00021632702555507421 new_interval=0.9997836729744449 since_epoch=23.016682844026946
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 19: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 20: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=23 time_spent_sleeping=1.0011810249416158 drift=-0.0011810249416157603 new_interval=0.9988189750583842 since_epoch=24.01789080305025
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=24 time_spent_sleeping=1.0004381309263408 drift=-0.0004381309263408184 new_interval=0.9995618690736592 since_epoch=25.018343757023104
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: repro[1] faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 14: HeartbeatRequest_v1(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 14: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
DEBUG:aiokafka.consumer.group_coordinator:Received successful heartbeat response for group repro
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 20: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 21: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=25 time_spent_sleeping=1.0010486399987713 drift=-0.0010486399987712502 new_interval=0.9989513600012287 since_epoch=26.019406441948377
^CException ignored in: <module 'threading' from '/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/threading.py'>
Traceback (most recent call last):
  File "/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/threading.py", line 1428, in _shutdown
    lock.acquire()
KeyboardInterrupt: 

didimelli avatar Aug 13 '24 14:08 didimelli