confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

list_consumer_group_offsets_request support multiple consumer groups

Open cobolbaby opened this issue 8 months ago • 1 comments

Now, list_consumer_group_offsets only supports a single consumer group, and the efficiency of looping is too low.

from confluent_kafka import (Consumer, ConsumerGroupTopicPartitions,
                             KafkaException, TopicPartition)
from confluent_kafka.admin import AdminClient

def get_kafka_lag_consumer_groups():
    """
    获取 Kafka 所有 consumer group 的 lag 信息(优化为批量查询)
    """
    try:
        admin_client = AdminClient({
            'bootstrap.servers': KAFKA_CONNECT_BOOTSTRAP_SERVERS
        })
        
        consumer = Consumer({
            'bootstrap.servers': KAFKA_CONNECT_BOOTSTRAP_SERVERS,
            'group.id': 'monitoring-consumer',
            'auto.offset.reset': 'latest',
            'enable.auto.commit': False
        })

        # 获取所有 consumer group
        future_groups = admin_client.list_consumer_groups(request_timeout=10)
        groups = future_groups.result()
        
        consumer_group_ids = [ConsumerGroupTopicPartitions(g.group_id) for g in groups.valid 
                              if g.group_id.startswith("connect-")]
        if not consumer_group_ids:
            logging.warning("No consumer groups match the filter.")
            return []
        
        logging.info(f"Fetching offsets for {len(consumer_group_ids)} consumer groups...")

        high_lag_groups = []

        # TODO:由于 lib 库不支持批量查询的操作,所以只能循环处理
        for consumer_group_id in consumer_group_ids:

            # 批量请求 consumer group 的 offsets
            future_offsets = admin_client.list_consumer_group_offsets([consumer_group_id])

            group_id = consumer_group_id.group_id
            offsets = future_offsets[group_id].result()
            
            logging.debug(f"Consumer Group: {group_id}")
            summedLag = 0
            for tp in offsets.topic_partitions:
                # 获取 partition 最新 offset(high watermark)
                high_watermark = consumer.get_watermark_offsets(tp)[1]  # (low, high)
                lag = high_watermark - tp.offset  # 计算 lag
                summedLag += lag
                logging.debug(f"  Topic: {tp.topic}, Partition: {tp.partition}, Lag: {lag}")

            if summedLag > 100:
                logging.info(f"High lag consumer group found: {group_id}")
                high_lag_groups.append(group_id)
            
        return high_lag_groups
    except Exception as e:
        logging.error(f"Error fetching consumer groups: {e}")
        return []

cobolbaby avatar Mar 19 '25 07:03 cobolbaby