aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

Irrelevant warning logs after initial attempts to connect

Open dvdblk opened this issue 1 year ago • 0 comments

Describe the bug I'm getting some irrelevant warning logs after the first few initial connection attempts fail during the boot of Kafka. I start all the containers with a docker-compose setup. If I try connecting while Kafka is booting up with a 5s linear backoff (usually takes 5 retries) the warning logs appear roughly 80% of the time. If I set a one minute delay to my producer / consumer containers before attempting to connect, the warning logs never appear.

On top of that, it seems like that everything works well even after these warning logs appear. So it really makes no sense to see them.

Examples

All examples share the exact same code except for the fourth one where I've added a one minute delay with asyncio.sleep(60). I'm only restarting the containers with docker-compose up and down

1. Warning log - "Topic is not available during auto-create initialization"

[+] Running 8/8
 ⠿ Network app_default                 Created                                                                                                                                                                   0.0s
 ⠿ Volume "app_kafka_data"             Created                                                                                                                                                                   0.0s
 ⠿ Volume "app_zookeeper_data"         Created                                                                                                                                                                   0.0s
 ⠿ Container app-zookeeper-1           Started                                                                                                                                                                   0.6s
 ⠿ Container app-db-1                  Started                                                                                                                                                                   0.6s
 ⠿ Container app-kafka-1               Started                                                                                                                                                                   0.8s
 ⠿ Container app-data_consumer_eth-1   Started                                                                                                                                                                   1.2s
 ⠿ Container app-data_producer_eth-1   Started                                                                                                                                                                   1.2s
app-data_producer_eth-1  | 2022-07-25 12:01:02.853 __main__ INFO     Config loaded.
app-data_producer_eth-1  | 2022-07-25 12:01:02.853 __main__ INFO     Starting the app...
app-data_producer_eth-1  | 2022-07-25 12:01:02.853 app.kafka.manager INFO     Connecting to Kafka with linear backoff
app-data_producer_eth-1  | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.18.0.4', 9092)
app-data_producer_eth-1  | 2022-07-25 12:01:02.860 app.kafka.manager INFO     Connection failed - retrying in 5s
app-data_producer_eth-1  | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.18.0.4', 9092)
app-data_producer_eth-1  | 2022-07-25 12:01:07.866 app.kafka.manager INFO     Connection failed - retrying in 5s
app-data_producer_eth-1  | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.18.0.4', 9092)
app-data_producer_eth-1  | 2022-07-25 12:01:12.874 app.kafka.manager INFO     Connection failed - retrying in 5s
app-data_producer_eth-1  | 2022-07-25 12:01:17.879 app.kafka.manager INFO     Connection failed - retrying in 5s
app-data_producer_eth-1  | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.18.0.4', 9092)
app-data_producer_eth-1  | 2022-07-25 12:01:23.109 app.kafka.manager INFO     Connected to Kafka
app-data_producer_eth-1  | Topic eth is not available during auto-create initialization
app-data_producer_eth-1  | Topic eth is not available during auto-create initialization
app-data_producer_eth-1  | Topic eth is not available during auto-create initialization
app-data_producer_eth-1  | Topic eth is not available during auto-create initialization
app-data_producer_eth-1  | Topic eth is not available during auto-create initialization
app-data_producer_eth-1  | Topic eth is not available during auto-create initialization
app-data_producer_eth-1  | 2022-07-25 12:01:24.490 app.kafka.manager INFO     Disconnecting from Kafka
app-data_producer_eth-1  | 2022-07-25 12:01:24.490 app.kafka.manager INFO     Disconnected from Kafka
app-data_producer_eth-1  | 2022-07-25 12:01:24.490 __main__ INFO     Exiting the app...
app-data_producer_eth-1 exited with code 0

Sending and receiving events on this topic works well even when this warning shows up.

2. Warning log - "No broker metadata found in MetadataResponse"

[+] Running 6/6
 ⠿ Network app_default                 Created                                                                                                                                                                   0.0s
 ⠿ Container app-db-1                  Started                                                                                                                                                                   0.4s
 ⠿ Container app-zookeeper-1           Started                                                                                                                                                                   0.4s
 ⠿ Container app-kafka-1               Started                                                                                                                                                                   0.6s
 ⠿ Container app-data_producer_eth-1   Started                                                                                                                                                                   1.1s
 ⠿ Container app-data_consumer_eth-1   Started                                                                                                                                                                   1.0s
app-data_producer_eth-1  | 2022-07-25 12:03:50.911 __main__ INFO     Config loaded.
app-data_producer_eth-1  | 2022-07-25 12:03:50.911 __main__ INFO     Starting the app...
app-data_producer_eth-1  | 2022-07-25 12:03:50.912 app.kafka.manager INFO     Connecting to Kafka with linear backoff
app-data_producer_eth-1  | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.21.0.4', 9092)
app-data_producer_eth-1  | 2022-07-25 12:03:50.914 app.kafka.manager INFO     Connection failed - retrying in 5s
app-data_producer_eth-1  | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.21.0.4', 9092)
app-data_producer_eth-1  | 2022-07-25 12:03:55.920 app.kafka.manager INFO     Connection failed - retrying in 5s
app-data_producer_eth-1  | 2022-07-25 12:04:00.926 app.kafka.manager INFO     Connection failed - retrying in 5s
app-data_producer_eth-1  | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.21.0.4', 9092)
app-data_producer_eth-1  | 2022-07-25 12:04:05.933 app.kafka.manager INFO     Connection failed - retrying in 5s
app-data_producer_eth-1  | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.21.0.4', 9092)
app-data_producer_eth-1  | No broker metadata found in MetadataResponse
app-data_producer_eth-1  | Unable connect to node with id None: StaleMetadata: Broker id None not in current metadata
app-data_producer_eth-1  | Could not send <class 'aiokafka.protocol.transaction.InitProducerIdRequest_v0'>: NodeNotReadyError('Attempt to send a request to node which is not ready (node id None).')
app-data_producer_eth-1  | 2022-07-25 12:04:12.305 app.kafka.manager INFO     Connected to Kafka
app-data_producer_eth-1  | 2022-07-25 12:04:12.793 app.kafka.manager INFO     Disconnecting from Kafka
app-data_producer_eth-1  | 2022-07-25 12:04:12.794 app.kafka.manager INFO     Disconnected from Kafka
app-data_producer_eth-1  | 2022-07-25 12:04:12.794 __main__ INFO     Exiting the app...
app-data_producer_eth-1 exited with code 0

The Unable connect to node with id None: StaleMetadata: Broker id None not in current metadata warning appears even with KAFKA_CFG_BROKER_ID set. Not really sure why is this happening because even in the Kafka container logs the node id is always assigned properly.

3. No warnings

Sometimes no warning logs appear at all.

[+] Running 6/6
 ⠿ Network app_default                 Created                                                                                                                                                                   0.0s
 ⠿ Container app-zookeeper-1           Started                                                                                                                                                                   0.4s
 ⠿ Container app-db-1                  Started                                                                                                                                                                   0.4s
 ⠿ Container app-kafka-1               Started                                                                                                                                                                   0.6s
 ⠿ Container app-data_producer_eth-1   Started                                                                                                                                                                   1.0s
 ⠿ Container app-data_consumer_eth-1   Started                                                                                                                                                                   1.0s
app-data_producer_eth-1  | 2022-07-25 12:03:04.755 __main__ INFO     Config loaded.
app-data_producer_eth-1  | 2022-07-25 12:03:04.755 __main__ INFO     Starting the app...
app-data_producer_eth-1  | 2022-07-25 12:03:04.755 app.kafka.manager INFO     Connecting to Kafka with linear backoff
app-data_producer_eth-1  | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.20.0.4', 9092)
app-data_producer_eth-1  | 2022-07-25 12:03:04.758 app.kafka.manager INFO     Connection failed - retrying in 5s
app-data_producer_eth-1  | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.20.0.4', 9092)
app-data_producer_eth-1  | 2022-07-25 12:03:09.768 app.kafka.manager INFO     Connection failed - retrying in 5s
app-data_producer_eth-1  | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.20.0.4', 9092)
app-data_producer_eth-1  | 2022-07-25 12:03:14.776 app.kafka.manager INFO     Connection failed - retrying in 5s
app-data_producer_eth-1  | 2022-07-25 12:03:19.785 app.kafka.manager INFO     Connection failed - retrying in 5s
app-data_producer_eth-1  | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.20.0.4', 9092)
app-data_producer_eth-1  | 2022-07-25 12:03:24.790 app.kafka.manager INFO     Connection failed - retrying in 5s
app-data_producer_eth-1  | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.20.0.4', 9092)
app-data_producer_eth-1  | 2022-07-25 12:03:30.126 app.kafka.manager INFO     Connected to Kafka
app-data_producer_eth-1  | 2022-07-25 12:03:30.871 app.kafka.manager INFO     Disconnecting from Kafka
app-data_producer_eth-1  | 2022-07-25 12:03:30.872 app.kafka.manager INFO     Disconnected from Kafka
app-data_producer_eth-1  | 2022-07-25 12:03:30.872 __main__ INFO     Exiting the app...
app-data_producer_eth-1 exited with code 0

4. Log with initial delay of 1 minute before connecting

[+] Running 6/6
 ⠿ Network app_default                 Created                                                                                                                                                                   0.0s
 ⠿ Container app-zookeeper-1           Started                                                                                                                                                                   0.4s
 ⠿ Container app-db-1                  Started                                                                                                                                                                   0.5s
 ⠿ Container app-kafka-1               Started                                                                                                                                                                   0.7s
 ⠿ Container app-data_consumer_eth-1   Started                                                                                                                                                                   1.1s
 ⠿ Container app-data_producer_eth-1   Started                                                                                                                                                                   1.0s
app-data_producer_eth-1  | 2022-07-25 12:05:12.325 __main__ INFO     Config loaded.
app-data_producer_eth-1  | 2022-07-25 12:05:12.325 __main__ INFO     Starting the app...
app-data_producer_eth-1  | 2022-07-25 12:05:12.325 app.kafka.manager INFO     Waiting 1 minute for Kafka to boot up
app-data_producer_eth-1  | 2022-07-25 12:06:12.368 app.kafka.manager INFO     Connecting to Kafka with linear backoff
app-data_producer_eth-1  | 2022-07-25 12:06:12.583 app.kafka.manager INFO     Connected to Kafka
app-data_producer_eth-1  | 2022-07-25 12:06:13.124 app.kafka.manager INFO     Disconnecting from Kafka
app-data_producer_eth-1  | 2022-07-25 12:06:13.124 app.kafka.manager INFO     Disconnected from Kafka
app-data_producer_eth-1  | 2022-07-25 12:06:13.124 __main__ INFO     Exiting the app...
app-data_producer_eth-1 exited with code 0

No warning logs appear when a 1 minute delay is used and the first connection attempt succeeds.

Expected behaviour I think these warning logs should not show up at all after failing to connect while Kafka boots up. Or they should be changed a little to something more verbose.

Environment (please complete the following information):

  • aiokafka version (python -c "import aiokafka; print(aiokafka.__version__)"): 0.7.2
  • kafka-python version (python -c "import kafka; print(kafka.__version__)"): -
  • Kafka Broker version (kafka-topics.sh --version): 3.2.0
  • Other information (Confluent Cloud version, etc.):

Reproducible example I can add a fully reproducible example later but my app only sends a single 'test' message to one topic.

docker-compose.yml:

version: "3.9"
services:
  # Kafka
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.8
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: docker.io/bitnami/kafka:3.2
    ports:
      - "9092:9092"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_CFG_LISTENERS=PLAINTEXT://kafka:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

  data_producer_eth:
    build:
      context: ./src/data_producer
    profiles: ["eth", "all"]
    depends_on:
      - kafka

# Kafka related config
# see: https://github.com/bitnami/bitnami-docker-kafka/blob/a576f9847c9ed455645d6540a7f32e7935e6b4d7/docker-compose.yml#L24
volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local

Only class related to Kafka and the log examples:

class KafkaManager:
    """
    Manage the Kafka cluster connection and allow producing
    messages to a selected topic.
    """

    # The delay between initial connection attempts (in seconds)
    LINEAR_BACKOFF_DELAY = 5
    # The maximum allowed initial connection attempts before the app exits
    INITIAL_CONNECTION_MAX_ATTEMPTS = 15

    def __init__(self, kafka_url: str, topic: str) -> None:
        """
        Args:
            kafka_url: the url of the Kafka cluster
            topic: the Kafka topic
        """
        self.producer = AIOKafkaProducer(
            bootstrap_servers=kafka_url,
            enable_idempotence=True
        )
        self.topic = topic

    async def start(self):
        """Connect (with linear backoff) to the kafka cluster.

        Note:
            Retrying with linear backoff is required as the
            startup time of Kafka is variable (usually 25-35s)
        """
        log.info("Waiting 1 minute for Kafka to boot up")
        await asyncio.sleep(60)
        log.info("Connecting to Kafka with linear backoff")
        connected, attempt_i = False, 0

        while not connected:
            # Check if we haven't reached max attempts
            attempt_i += 1
            if attempt_i > self.INITIAL_CONNECTION_MAX_ATTEMPTS:
                # Need to cleanup the producer before exiting.
                await self.producer.stop()
                # Exit the app if the max attempts have been reached
                raise KafkaManagerError(
                    "Maximum number of initial connection attempts reached."
                )
            # Try to connect to the cluster
            try:
                await self.producer.start()
                # If the call above doesn't raise an exception,
                # we're connected.
                connected = True
            except KafkaConnectionError as err:
                # Retry if we get an exception
                log.info(f"Connection failed - retrying in {self.LINEAR_BACKOFF_DELAY}s")
                await asyncio.sleep(self.LINEAR_BACKOFF_DELAY)
                continue
        log.info("Connected to Kafka")

    async def stop(self):
        """Flush pending data and disconnect from the kafka cluster"""
        log.info("Disconnecting from Kafka")
        await self.producer.stop()
        log.info("Disconnected from Kafka")

    async def send_message(self, msg: str):
        """Send message to a Kafka broker"""
        try:
            # Send the message
            send_future = await self.producer.send(
                topic=self.topic,
                value=msg.encode()
            )
            # Message will either be delivered or an unrecoverable
            # error will occur.
            _ = await send_future
        except KafkaTimeoutError:
            # Producer request timeout, message could have been sent to
            # the broker but there is no ack
            # TODO: somehow figure out whether this message should be
            # resent or not, maybe flag this message with a 'check_duplicate'
            # flag and let the consumer figure it out if these transactions are already
            # present in the database
            log.error(f"KafkaTimeoutError on {msg}")
        except KafkaError as err:
            # Generic kafka error
            log.error(f"{err} on {msg}")
            raise err

dvdblk avatar Jul 25 '22 12:07 dvdblk