confluent-kafka-python
confluent-kafka-python copied to clipboard
list_consumer_group_offsets_request support multiple consumer groups
Now, list_consumer_group_offsets only supports a single consumer group, and the efficiency of looping is too low.
from confluent_kafka import (Consumer, ConsumerGroupTopicPartitions,
KafkaException, TopicPartition)
from confluent_kafka.admin import AdminClient
def get_kafka_lag_consumer_groups():
"""
获取 Kafka 所有 consumer group 的 lag 信息(优化为批量查询)
"""
try:
admin_client = AdminClient({
'bootstrap.servers': KAFKA_CONNECT_BOOTSTRAP_SERVERS
})
consumer = Consumer({
'bootstrap.servers': KAFKA_CONNECT_BOOTSTRAP_SERVERS,
'group.id': 'monitoring-consumer',
'auto.offset.reset': 'latest',
'enable.auto.commit': False
})
# 获取所有 consumer group
future_groups = admin_client.list_consumer_groups(request_timeout=10)
groups = future_groups.result()
consumer_group_ids = [ConsumerGroupTopicPartitions(g.group_id) for g in groups.valid
if g.group_id.startswith("connect-")]
if not consumer_group_ids:
logging.warning("No consumer groups match the filter.")
return []
logging.info(f"Fetching offsets for {len(consumer_group_ids)} consumer groups...")
high_lag_groups = []
# TODO:由于 lib 库不支持批量查询的操作,所以只能循环处理
for consumer_group_id in consumer_group_ids:
# 批量请求 consumer group 的 offsets
future_offsets = admin_client.list_consumer_group_offsets([consumer_group_id])
group_id = consumer_group_id.group_id
offsets = future_offsets[group_id].result()
logging.debug(f"Consumer Group: {group_id}")
summedLag = 0
for tp in offsets.topic_partitions:
# 获取 partition 最新 offset(high watermark)
high_watermark = consumer.get_watermark_offsets(tp)[1] # (low, high)
lag = high_watermark - tp.offset # 计算 lag
summedLag += lag
logging.debug(f" Topic: {tp.topic}, Partition: {tp.partition}, Lag: {lag}")
if summedLag > 100:
logging.info(f"High lag consumer group found: {group_id}")
high_lag_groups.append(group_id)
return high_lag_groups
except Exception as e:
logging.error(f"Error fetching consumer groups: {e}")
return []