confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

AdminClient caching all topics in metadata request calls

Open SwayGom opened this issue 1 year ago • 13 comments

Description

I was doing some testing with the Admin client where I delete and recreate topics over and over again with new names over a period of three days and I see that the admin client is caching every topic name and will cause timeouts. I have gotten to a point where only 5 topics exist on the broker but its looking for 8k topics

How to reproduce

normal admin client can be 1.9.2 or 2.3, but list current topics, then delete them, then create new topics again over and over for about a week and you will see them.

Checklist

Please provide the following information:

  • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • [1.9.2]
  • [x] Apache Kafka broker version: Microsoft Event Hubs
  • [x] Client configuration: {...} image conf = { 'bootstrap.servers': self.stressconfigs.get_appr_broker(self.mode), 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '', 'sasl.mechanism': 'PLAIN', 'sasl.username': '$ConnectionString', 'sasl.password': self.stressconfigs.get_appr_Connectiontring(self.mode), 'client.id': 'python-admin-client' + self.clientId, 'allow.auto.create.topics': 'false', 'connections.max.idle.ms': 180000, 'socket.keepalive.enable' : True, 'request.timeout.ms': 60000, 'socket.timeout.ms': 60000, 'message.timeout.ms': 60000, }
  • [x] Operating system: linux Azure kubernetes
  • [x] Provide client logs (with 'debug': '..' as necessary)
  • [ ] Provide broker log excerpts
  • [x] Critical issue Yes

SwayGom avatar Apr 26 '24 17:04 SwayGom

@pranavrth any ideas here? or any other context needed?

SwayGom avatar May 13 '24 14:05 SwayGom

I didn't understand the flow of your code:

  • Are you just creating and deleting the topic in the Admin client?
  • Where are you seeing these topics?
  • Can you provide sample code to reproduce the issue?

pranavrth avatar May 15 '24 09:05 pranavrth

The main worker here is the Topic_manipulation method

  1. list topics
  2. describe topics
  3. delete all topics
  4. create 10 topics
  5. repeat

We only see the no longer existent topics listed in metadata but we dont see it in our print statements of the topics returned in any listtopic,createtopic, or deletetopic for the repro please just add these env variables or replace as you see fit brokerList = os.getenv('EVENTHUBS_BROKER') connectionString = os.getenv('EVENTHUBS_CONNECTION_STRING')

from confluent_kafka.admin import AdminClient, NewTopic, KafkaError
import confluent_kafka
from random import randint
from time import sleep
from uuid import uuid4
from datetime import datetime
from pprint import pprint
import os 

global_timeout = 60
numOfTopicsFromLastCreateAttempt = 0
topicID = 0

def printwithSeparator(message):
    print( '-'*20)
    print(message)
    print( '-'*20)

def create_admin_client():
    conf = {
        'bootstrap.servers': os.getenv('EVENTHUBS_BROKER'), 
        'security.protocol': 'SASL_SSL',
        'ssl.ca.location': '',
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '$ConnectionString',
        'sasl.password':  os.getenv('EVENTHUBS_CONNECTION_STRING'),
        'client.id': f'python-admin-{randint(0, 10000)}-{confluent_kafka.__version__}',
        'allow.auto.create.topics': 'false',
        'connections.max.idle.ms': 180000,
        'socket.keepalive.enable' : True,
        'request.timeout.ms': 60000,
        'socket.timeout.ms': 60000,
        'message.timeout.ms':  60000,
        'debug': 'admin,metadata',
    }
    admin = AdminClient(conf)
    printwithSeparator(f'Admin { conf["client.id"] } started')
    return admin
    
admin = create_admin_client()
        
def describeTopics(inTopics):
    described = []
    for topic in inTopics:
        try:
            md = admin.list_topics(topic ,timeout=global_timeout)
            currTopic =  list(md.topics.values())[0]
            described.append(str(currTopic))
        except Exception as e:
            printwithSeparator("Error describing certain topic {}".format(topic), e)            
    printwithSeparator(f'Described {len(described)} Topics: [{", ".join(described)}]')
   
def listTopic(in_timeout=global_timeout):
    try:
        md = admin.list_topics(timeout=in_timeout)
        topics = set()
        for t in iter(md.topics.values()):
            if t.error is not None:
                errstr = ": {}".format(t.error)
                printwithSeparator(" Error getting topic: {} {} error:{}".format(t, errstr,t.error))
            else:
                errstr = ""
            topics.add(f'{t}')
        printwithSeparator(f'populated {len(topics)} topic(s) [{", ".join(topics)}]')
    except Exception as e:
        printwithSeparator("Error Listing topics", e)
        return set()
    return topics

def delete_topics(topics, in_timeout=global_timeout):
    if (len(topics) == 0):
        return
    delete_topics = []
    try:
        fs = admin.delete_topics(topics, request_timeout=in_timeout)
        for topic, f in fs.items():
            try:
                f.result()
                delete_topics.append(topic)
            except Exception as e:
                printwithSeparator("Failed to delete topic {}: {}".format(topic, e))
    except Exception as e:          
        printwithSeparator("Error Deleting topics {}".format(topics), e)
    printwithSeparator(f'Deleted {len(delete_topics)} Topics: [{", ".join(delete_topics)}]')

def createTenTopics():
    newtopics = []
    global topicID
    topics =[]
    for i in range(10):
        newtopics.append(NewTopic(f'dyna-topic-{topicID}', num_partitions= randint(1, 32), replication_factor=1))
        topicID += 1
    try:
        fs = admin.create_topics(newtopics, request_timeout=global_timeout)
        for topic, f in fs.items():
            try:
                f.result()
                topics.append(topic)
            except Exception as e:
                printwithSeparator("Failed to create topic {}: {}".format(topic, e))
        printwithSeparator(f'Tried to create {len(newtopics)} Created {len(topics)} Topics: [{", ".join(topics)}]')
    except Exception as e:
        printwithSeparator("Error Creating Random topics", e)    
    return len(topics)

def topic_manipulation():
    global numOfTopicsFromLastCreateAttempt
    topics = listTopic()
    printwithSeparator(f'last created topics:{numOfTopicsFromLastCreateAttempt} populated {len(topics)}')
    if topics and len(topics) > 0:
        topics = list(topics)
        describeTopics(topics)
        delete_topics(topics)
        sleep(10)
    numOfTopicsFromLastCreateAttempt = createTenTopics()
    sleep(10)

def main():
    try:
        while True:
            try:
                topic_manipulation()
            except Exception as e:
                printwithSeparator(f'Caught Exception: {e}')
    except KeyboardInterrupt as e:
        printwithSeparator("Caught KeyboardInterrupt")

main()

SwayGom avatar May 17 '24 14:05 SwayGom

we see the topic list of the metadata request for apikey 3 grow over time increasingly even though when the admin client does a listtopic we only see the current topics that are in the namespace for example:

Namespace.servicebus.windows.net contains: dyna-20 dyna-21 dyna-22 dyna-23 dyna-24

the admin client makes the metadata request with a large list of topics even though theres only 5 topics in the namespace but the metadata Metadata Request => [topics] allow_auto_topic_creation topics => ['dyna-1','dyna-2','dyna-3','dyna-4','dyna-5','dyna-6',........'dyna-6000',null,null,null,....., 2000th null] name => STRING allow_auto_topic_creation => False

Heres the debug log on one of the topics that was deleted but admin was still looking

%7|1715967551.780|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: Request metadata for 40 topic(s): refresh unavailable topics
%7|1715967551.902|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: ===== Received metadata (for 31 requested topics): refresh unavailable topics =====
%7|1715967551.902|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: ClusterId: namespace.servicebus.windows.net, ControllerId: 0
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: 1 brokers, 31 topics
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Broker #0/1: namespace.servicebus.windows.net:9093 NodeId 0
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-2 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-2 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-4 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-4 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-3 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-3 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-1 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.904|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-1 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.904|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-6 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.904|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-6 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.904|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-0 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-0 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-5 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-5 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-8 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-8 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-9 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-9 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-7 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-7 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-15 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-15 (PartCnt 0): Broker: Unknown topic or partition

SwayGom avatar May 17 '24 14:05 SwayGom

Hey @pranavrth hopefully this is enough to repro the issue

SwayGom avatar May 20 '24 21:05 SwayGom

Sure. I will check it this week.

pranavrth avatar May 27 '24 19:05 pranavrth

Able to replicate the issue and checking the cause.

pranavrth avatar Jun 06 '24 14:06 pranavrth

The Admin client internally uses Producer handle to handle Admin client request as it doesn't have its own handle. This is expected behaviour in Producer instance to not drop requested topics as it thinks that the topic will be created later.

The issue that I am trying to understand right now is that we have a property topic.metadata.propagation.max.ms which defines how much time to wait for topic metadata to propagate with in the cluster. Till this time, producer doesn't mark topic as non existent. In my understanding, librdkafka should remove this topic from the cache after this. I am trying to understand more if my understanding is correct or it doesn't remove the topic from the cache forever. If the latter is true case right now, I will talk within the team if this should be the correct behaviour or not.

pranavrth avatar Jun 11 '24 11:06 pranavrth

Only happens with python, doesnt happen with any other confluent client so far

SwayGom avatar Jul 02 '24 16:07 SwayGom

Hey @pranavrth any new findings on this?

SwayGom avatar Jul 23 '24 18:07 SwayGom