aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

[BUG] Heartbeat background thread in "Transaction Consume-Process-Produce" paradigm

Open diwgan32 opened this issue 3 years ago • 1 comments

Describe the bug I'm noticing that heartbeats are not being sent from a background thread when using the "Transaction Consume-Process-Produce" paradigm described here.

Expected behaviour I expect the logger to print continuous heartbeats as consumer messages are processing but I notice that the heartbeats freeze when a worker is processing a job and continue after the processing is done.

Environment (please complete the following information):

  • aiokafka version (python -c "import aiokafka; print(aiokafka.__version__)"): 0.7.0
  • kafka-python version (python -c "import kafka; print(kafka.__version__)"): 2.0.2
  • Kafka Broker version (kafka-topics.sh --version): 2.6.0 (Commit:62abe01bee039651)
  • Other information (Confluent Cloud version, etc.):

Reproducible example

import asyncio
import logging
from aiokafka import \
    AIOKafkaConsumer, \
    AIOKafkaProducer
import time
logging.basicConfig(level=logging.DEBUG)

def build_offset(msg):
    tp = TopicPartition(msg.topic, msg.partition)
    ret = {}
    ret[tp] = msg.offset + 1
    return ret

async def func():
    loop = asyncio.get_event_loop()
    consumer = AIOKafkaConsumer(
            auto_offset_reset='earliest',
            group_id="group",
            loop=loop,
            max_poll_records=1,
            enable_auto_commit=False,
            isolation_level="read_committed",
            heartbeat_interval_ms=1000)
    consumer.subscribe(["test_event"])
    await consumer.start()

    producer = AIOKafkaProducer(
        loop=loop,
        transactional_id="some-id"
    )
    await producer.start()

    try:
        while True:
            msg = await consumer.getone()
            print(msg)
            async with producer.transaction():
                commit_offsets = build_offset(msg);
                time.sleep(5)
                await producer.send("test_event", b"some val")
                await producer.send_offsets_to_transaction(commit_offsets, "group")
    finally:
        await consumer.stop()
        await producer.stop()

    
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(func())

Additional Info The above example is a pared down version of the example provided in the docs (linked above). I'm simply grabbing an event off of the dummy test_event queue and sleeping for 5 seconds (to simulate an intensive processor load) and committing back to the same topic. When I run this code, I notice that the heartbeats occur once a second as expected before I fire off the first event to test_event. Then, something strange occurs: the heartbeats stop while the code is executing the time.sleep(5) line, and continue after that finishes. I don't think I'm missing print statements, because the logger prints out the heartbeat number, and they seem to be printing sequentially (despite the gap in heartbeats during the 5 second sleep). Here's a snippet of what I see when I run the above script with logging set to debug (filtering for just heartbeat events):

ConsumerRecord(topic='test_event', partition=0, offset=34, timestamp=1603958802338, timestamp_type=0, key=None, value=b'some val', checksum=None, serialized_key_size=-1, serialized_value_size=8, headers=())
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: group[17] aiokafka-0.7.0-e919e4d3-b8f2-402e-87f5-f857120d6a28
DEBUG:aiokafka.conn:<AIOKafkaConnection host=kafka port=9092> Request 5: HeartbeatRequest_v1(group='group', generation_id=17, member_id='aiokafka-0.7.0-e919e4d3-b8f2-402e-87f5-f857120d6a28')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=kafka port=9092> Response 5: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
ConsumerRecord(topic='test_event', partition=0, offset=36, timestamp=1603958807363, timestamp_type=0, key=None, value=b'some val', checksum=None, serialized_key_size=-1, serialized_value_size=8, headers=())
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: group[17] aiokafka-0.7.0-e919e4d3-b8f2-402e-87f5-f857120d6a28
DEBUG:aiokafka.conn:<AIOKafkaConnection host=kafka port=9092> Request 6: HeartbeatRequest_v1(group='group', generation_id=17, member_id='aiokafka-0.7.0-e919e4d3-b8f2-402e-87f5-f857120d6a28')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=kafka port=9092> Response 6: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
ConsumerRecord(topic='test_event', partition=0, offset=38, timestamp=1603959012488, timestamp_type=0, key=None, value=b'some val', checksum=None, serialized_key_size=-1, serialized_value_size=8, headers=())
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: group[17] aiokafka-0.7.0-e919e4d3-b8f2-402e-87f5-f857120d6a28
DEBUG:aiokafka.conn:<AIOKafkaConnection host=kafka port=9092> Request 7: HeartbeatRequest_v1(group='group', generation_id=17, member_id='aiokafka-0.7.0-e919e4d3-b8f2-402e-87f5-f857120d6a28')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=kafka port=9092> Response 7: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
ConsumerRecord(topic='test_event', partition=0, offset=40, timestamp=1603959017511, timestamp_type=0, key=None, value=b'some val', checksum=None, serialized_key_size=-1, serialized_value_size=8, headers=())

Clearly, even though the heartbeat interval was set to 1 second, only three heartbeats were printed out (there should have been 15 since the time interval between each message is roughly 5 seconds). My understanding was that aiokafka will send heartbeats via a separate background thread, as per KIP 62, but it seems that this is not happening somehow. Any guidance here is appreciated!

diwgan32 avatar Oct 29 '20 08:10 diwgan32

I also meet this problem and even worse: the consumer is just trying to sent a message to another thread using a queue with block=False. It should be a non-blocked and light-weight operation and should not has any effect on the heartbeat. But the result I fould is even heartbeat_interval_ms has been set to 1000, the actually interval goes up to 45 sec, cause the session become timeout and rebalance. That will fail the commit and cause duplicated message.

Actually, as code here shown, the heartbeat request is not sent by a separate background thread, but a routine sharing the same event loop with other tasks. As document of asyncio shown there is no diff priority between each tasks. That means there is no any guarantee for the interval of heartbeat. I found a disscusion about the problem has comprehensively described the situation.

I has tried that run aiokafka with a separate thread, using non-blocked queue(will catch the Full execption then asyncio.sleep(0.1) ) to reduce any prossible affect to the event loop. But that does not work probably due to the threads are coupled. Binding the process to a core could reduce the delay (from 2.3s to 1.5s), so I think It is also related with the system load.

There is two prossible solution:

  1. using a separate process running aiokafka.
  2. using all asyncio-based module in your program. For example, using aiohttp to interact with other systems when processing the message. However that is hard to tell the limition of tasks: I fould that only a running consumer instance will create 19 tasks with delay about 1-2 secs. It could be expected that more tasks will lead to more delay.

xlichao avatar Oct 30 '23 02:10 xlichao