aiokafka
aiokafka copied to clipboard
Irrelevant warning logs after initial attempts to connect
Describe the bug I'm getting some irrelevant warning logs after the first few initial connection attempts fail during the boot of Kafka. I start all the containers with a docker-compose setup. If I try connecting while Kafka is booting up with a 5s linear backoff (usually takes 5 retries) the warning logs appear roughly 80% of the time. If I set a one minute delay to my producer / consumer containers before attempting to connect, the warning logs never appear.
On top of that, it seems like that everything works well even after these warning logs appear. So it really makes no sense to see them.
Examples
All examples share the exact same code except for the fourth one where I've added a one minute delay with asyncio.sleep(60)
. I'm only restarting the containers with docker-compose up
and down
1. Warning log - "Topic is not available during auto-create initialization"
[+] Running 8/8
⠿ Network app_default Created 0.0s
⠿ Volume "app_kafka_data" Created 0.0s
⠿ Volume "app_zookeeper_data" Created 0.0s
⠿ Container app-zookeeper-1 Started 0.6s
⠿ Container app-db-1 Started 0.6s
⠿ Container app-kafka-1 Started 0.8s
⠿ Container app-data_consumer_eth-1 Started 1.2s
⠿ Container app-data_producer_eth-1 Started 1.2s
app-data_producer_eth-1 | 2022-07-25 12:01:02.853 __main__ INFO Config loaded.
app-data_producer_eth-1 | 2022-07-25 12:01:02.853 __main__ INFO Starting the app...
app-data_producer_eth-1 | 2022-07-25 12:01:02.853 app.kafka.manager INFO Connecting to Kafka with linear backoff
app-data_producer_eth-1 | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.18.0.4', 9092)
app-data_producer_eth-1 | 2022-07-25 12:01:02.860 app.kafka.manager INFO Connection failed - retrying in 5s
app-data_producer_eth-1 | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.18.0.4', 9092)
app-data_producer_eth-1 | 2022-07-25 12:01:07.866 app.kafka.manager INFO Connection failed - retrying in 5s
app-data_producer_eth-1 | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.18.0.4', 9092)
app-data_producer_eth-1 | 2022-07-25 12:01:12.874 app.kafka.manager INFO Connection failed - retrying in 5s
app-data_producer_eth-1 | 2022-07-25 12:01:17.879 app.kafka.manager INFO Connection failed - retrying in 5s
app-data_producer_eth-1 | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.18.0.4', 9092)
app-data_producer_eth-1 | 2022-07-25 12:01:23.109 app.kafka.manager INFO Connected to Kafka
app-data_producer_eth-1 | Topic eth is not available during auto-create initialization
app-data_producer_eth-1 | Topic eth is not available during auto-create initialization
app-data_producer_eth-1 | Topic eth is not available during auto-create initialization
app-data_producer_eth-1 | Topic eth is not available during auto-create initialization
app-data_producer_eth-1 | Topic eth is not available during auto-create initialization
app-data_producer_eth-1 | Topic eth is not available during auto-create initialization
app-data_producer_eth-1 | 2022-07-25 12:01:24.490 app.kafka.manager INFO Disconnecting from Kafka
app-data_producer_eth-1 | 2022-07-25 12:01:24.490 app.kafka.manager INFO Disconnected from Kafka
app-data_producer_eth-1 | 2022-07-25 12:01:24.490 __main__ INFO Exiting the app...
app-data_producer_eth-1 exited with code 0
Sending and receiving events on this topic works well even when this warning shows up.
2. Warning log - "No broker metadata found in MetadataResponse"
[+] Running 6/6
⠿ Network app_default Created 0.0s
⠿ Container app-db-1 Started 0.4s
⠿ Container app-zookeeper-1 Started 0.4s
⠿ Container app-kafka-1 Started 0.6s
⠿ Container app-data_producer_eth-1 Started 1.1s
⠿ Container app-data_consumer_eth-1 Started 1.0s
app-data_producer_eth-1 | 2022-07-25 12:03:50.911 __main__ INFO Config loaded.
app-data_producer_eth-1 | 2022-07-25 12:03:50.911 __main__ INFO Starting the app...
app-data_producer_eth-1 | 2022-07-25 12:03:50.912 app.kafka.manager INFO Connecting to Kafka with linear backoff
app-data_producer_eth-1 | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.21.0.4', 9092)
app-data_producer_eth-1 | 2022-07-25 12:03:50.914 app.kafka.manager INFO Connection failed - retrying in 5s
app-data_producer_eth-1 | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.21.0.4', 9092)
app-data_producer_eth-1 | 2022-07-25 12:03:55.920 app.kafka.manager INFO Connection failed - retrying in 5s
app-data_producer_eth-1 | 2022-07-25 12:04:00.926 app.kafka.manager INFO Connection failed - retrying in 5s
app-data_producer_eth-1 | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.21.0.4', 9092)
app-data_producer_eth-1 | 2022-07-25 12:04:05.933 app.kafka.manager INFO Connection failed - retrying in 5s
app-data_producer_eth-1 | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.21.0.4', 9092)
app-data_producer_eth-1 | No broker metadata found in MetadataResponse
app-data_producer_eth-1 | Unable connect to node with id None: StaleMetadata: Broker id None not in current metadata
app-data_producer_eth-1 | Could not send <class 'aiokafka.protocol.transaction.InitProducerIdRequest_v0'>: NodeNotReadyError('Attempt to send a request to node which is not ready (node id None).')
app-data_producer_eth-1 | 2022-07-25 12:04:12.305 app.kafka.manager INFO Connected to Kafka
app-data_producer_eth-1 | 2022-07-25 12:04:12.793 app.kafka.manager INFO Disconnecting from Kafka
app-data_producer_eth-1 | 2022-07-25 12:04:12.794 app.kafka.manager INFO Disconnected from Kafka
app-data_producer_eth-1 | 2022-07-25 12:04:12.794 __main__ INFO Exiting the app...
app-data_producer_eth-1 exited with code 0
The Unable connect to node with id None: StaleMetadata: Broker id None not in current metadata
warning appears even with KAFKA_CFG_BROKER_ID
set. Not really sure why is this happening because even in the Kafka container logs the node id is always assigned properly.
3. No warnings
Sometimes no warning logs appear at all.
[+] Running 6/6
⠿ Network app_default Created 0.0s
⠿ Container app-zookeeper-1 Started 0.4s
⠿ Container app-db-1 Started 0.4s
⠿ Container app-kafka-1 Started 0.6s
⠿ Container app-data_producer_eth-1 Started 1.0s
⠿ Container app-data_consumer_eth-1 Started 1.0s
app-data_producer_eth-1 | 2022-07-25 12:03:04.755 __main__ INFO Config loaded.
app-data_producer_eth-1 | 2022-07-25 12:03:04.755 __main__ INFO Starting the app...
app-data_producer_eth-1 | 2022-07-25 12:03:04.755 app.kafka.manager INFO Connecting to Kafka with linear backoff
app-data_producer_eth-1 | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.20.0.4', 9092)
app-data_producer_eth-1 | 2022-07-25 12:03:04.758 app.kafka.manager INFO Connection failed - retrying in 5s
app-data_producer_eth-1 | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.20.0.4', 9092)
app-data_producer_eth-1 | 2022-07-25 12:03:09.768 app.kafka.manager INFO Connection failed - retrying in 5s
app-data_producer_eth-1 | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.20.0.4', 9092)
app-data_producer_eth-1 | 2022-07-25 12:03:14.776 app.kafka.manager INFO Connection failed - retrying in 5s
app-data_producer_eth-1 | 2022-07-25 12:03:19.785 app.kafka.manager INFO Connection failed - retrying in 5s
app-data_producer_eth-1 | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.20.0.4', 9092)
app-data_producer_eth-1 | 2022-07-25 12:03:24.790 app.kafka.manager INFO Connection failed - retrying in 5s
app-data_producer_eth-1 | Unable connect to "kafka:9092": [Errno 111] Connect call failed ('172.20.0.4', 9092)
app-data_producer_eth-1 | 2022-07-25 12:03:30.126 app.kafka.manager INFO Connected to Kafka
app-data_producer_eth-1 | 2022-07-25 12:03:30.871 app.kafka.manager INFO Disconnecting from Kafka
app-data_producer_eth-1 | 2022-07-25 12:03:30.872 app.kafka.manager INFO Disconnected from Kafka
app-data_producer_eth-1 | 2022-07-25 12:03:30.872 __main__ INFO Exiting the app...
app-data_producer_eth-1 exited with code 0
4. Log with initial delay of 1 minute before connecting
[+] Running 6/6
⠿ Network app_default Created 0.0s
⠿ Container app-zookeeper-1 Started 0.4s
⠿ Container app-db-1 Started 0.5s
⠿ Container app-kafka-1 Started 0.7s
⠿ Container app-data_consumer_eth-1 Started 1.1s
⠿ Container app-data_producer_eth-1 Started 1.0s
app-data_producer_eth-1 | 2022-07-25 12:05:12.325 __main__ INFO Config loaded.
app-data_producer_eth-1 | 2022-07-25 12:05:12.325 __main__ INFO Starting the app...
app-data_producer_eth-1 | 2022-07-25 12:05:12.325 app.kafka.manager INFO Waiting 1 minute for Kafka to boot up
app-data_producer_eth-1 | 2022-07-25 12:06:12.368 app.kafka.manager INFO Connecting to Kafka with linear backoff
app-data_producer_eth-1 | 2022-07-25 12:06:12.583 app.kafka.manager INFO Connected to Kafka
app-data_producer_eth-1 | 2022-07-25 12:06:13.124 app.kafka.manager INFO Disconnecting from Kafka
app-data_producer_eth-1 | 2022-07-25 12:06:13.124 app.kafka.manager INFO Disconnected from Kafka
app-data_producer_eth-1 | 2022-07-25 12:06:13.124 __main__ INFO Exiting the app...
app-data_producer_eth-1 exited with code 0
No warning logs appear when a 1 minute delay is used and the first connection attempt succeeds.
Expected behaviour I think these warning logs should not show up at all after failing to connect while Kafka boots up. Or they should be changed a little to something more verbose.
Environment (please complete the following information):
- aiokafka version (
python -c "import aiokafka; print(aiokafka.__version__)"
): 0.7.2 - kafka-python version (
python -c "import kafka; print(kafka.__version__)"
): - - Kafka Broker version (
kafka-topics.sh --version
): 3.2.0 - Other information (Confluent Cloud version, etc.):
Reproducible example I can add a fully reproducible example later but my app only sends a single 'test' message to one topic.
docker-compose.yml
:
version: "3.9"
services:
# Kafka
zookeeper:
image: docker.io/bitnami/zookeeper:3.8
ports:
- "2181:2181"
volumes:
- "zookeeper_data:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: docker.io/bitnami/kafka:3.2
ports:
- "9092:9092"
volumes:
- "kafka_data:/bitnami"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
data_producer_eth:
build:
context: ./src/data_producer
profiles: ["eth", "all"]
depends_on:
- kafka
# Kafka related config
# see: https://github.com/bitnami/bitnami-docker-kafka/blob/a576f9847c9ed455645d6540a7f32e7935e6b4d7/docker-compose.yml#L24
volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
Only class related to Kafka and the log examples:
class KafkaManager:
"""
Manage the Kafka cluster connection and allow producing
messages to a selected topic.
"""
# The delay between initial connection attempts (in seconds)
LINEAR_BACKOFF_DELAY = 5
# The maximum allowed initial connection attempts before the app exits
INITIAL_CONNECTION_MAX_ATTEMPTS = 15
def __init__(self, kafka_url: str, topic: str) -> None:
"""
Args:
kafka_url: the url of the Kafka cluster
topic: the Kafka topic
"""
self.producer = AIOKafkaProducer(
bootstrap_servers=kafka_url,
enable_idempotence=True
)
self.topic = topic
async def start(self):
"""Connect (with linear backoff) to the kafka cluster.
Note:
Retrying with linear backoff is required as the
startup time of Kafka is variable (usually 25-35s)
"""
log.info("Waiting 1 minute for Kafka to boot up")
await asyncio.sleep(60)
log.info("Connecting to Kafka with linear backoff")
connected, attempt_i = False, 0
while not connected:
# Check if we haven't reached max attempts
attempt_i += 1
if attempt_i > self.INITIAL_CONNECTION_MAX_ATTEMPTS:
# Need to cleanup the producer before exiting.
await self.producer.stop()
# Exit the app if the max attempts have been reached
raise KafkaManagerError(
"Maximum number of initial connection attempts reached."
)
# Try to connect to the cluster
try:
await self.producer.start()
# If the call above doesn't raise an exception,
# we're connected.
connected = True
except KafkaConnectionError as err:
# Retry if we get an exception
log.info(f"Connection failed - retrying in {self.LINEAR_BACKOFF_DELAY}s")
await asyncio.sleep(self.LINEAR_BACKOFF_DELAY)
continue
log.info("Connected to Kafka")
async def stop(self):
"""Flush pending data and disconnect from the kafka cluster"""
log.info("Disconnecting from Kafka")
await self.producer.stop()
log.info("Disconnected from Kafka")
async def send_message(self, msg: str):
"""Send message to a Kafka broker"""
try:
# Send the message
send_future = await self.producer.send(
topic=self.topic,
value=msg.encode()
)
# Message will either be delivered or an unrecoverable
# error will occur.
_ = await send_future
except KafkaTimeoutError:
# Producer request timeout, message could have been sent to
# the broker but there is no ack
# TODO: somehow figure out whether this message should be
# resent or not, maybe flag this message with a 'check_duplicate'
# flag and let the consumer figure it out if these transactions are already
# present in the database
log.error(f"KafkaTimeoutError on {msg}")
except KafkaError as err:
# Generic kafka error
log.error(f"{err} on {msg}")
raise err