confluent-kafka-python
confluent-kafka-python copied to clipboard
GroupCoordinator: SASL authentication error: SaslAuthenticateRequest failed: Local: Broker transport failure (after 0ms in state DOWN)
Description
This message spams our log. Is it a real error, or can this be ignored?
%3|1720719719.448|FAIL|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: SASL authentication error: SaslAuthenticateRequest failed: Local: Broker transport failure (after 0ms in state DOWN)
All operations on the consumer group work and succeed, but then closing it, we get that "error" message.
I don't quite get it, but it seems that for some reason when closing the consumer group, the lib tries to (re-)auth the connection. Even though it is in the middle of terminating it.
I would gladly take any suggestion how to avoid the message.
How to reproduce
# docker-compose.yaml
---
# docker compose -f ./docker-compose.yaml run --build test
# docker compose -f ./docker-compose.yaml run --build test
# docker compose -f ./docker-compose.yaml run --build --env "DEBUG_LEVEL=broker,security" test > security.log 2>&1
# docker compose -f ./docker-compose.yaml run --build --env "DEBUG_LEVEL=all" test > all.log 2>&1
# %3|1720719719.448|FAIL|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: SASL authentication error: SaslAuthenticateRequest failed: Local: Broker transport failure (after 0ms in state DOWN)
name: bug
services:
test:
environment:
- DEBUG_LEVEL=
# - DEBUG_LEVEL=broker,security
# - DEBUG_LEVEL=all
build:
dockerfile_inline: |
FROM alpine:3.20
RUN apk add --no-cache gcc musl-dev librdkafka-dev~=2.4 python3-dev py3-pip
WORKDIR /app
RUN pip install confluent-kafka==2.4.0 --break-system-packages
# COPY test.py test.py
CMD python test.py
configs:
- source: "test.py"
target: /app/test.py
depends_on:
kafka:
condition: service_healthy
kafka:
hostname: kafka.cloud
image: "confluentinc/cp-kafka:7.6.1"
environment:
- KAFKA_NODE_ID=0
- KAFKA_PROCESS_ROLES=controller,broker
- KAFKA_LISTENERS=INTERNAL://:9092,CONTROLLER://:9093
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,INTERNAL:SASL_PLAINTEXT
- KAFKA_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
# confluent
- CLUSTER_ID=YONsFVF4e6Jm0dNNxf9PP0
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka.cloud:9092
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
# sasl support
- KAFKA_SASL_ENABLED_MECHANISMS=SCRAM-SHA-512,PLAIN
- KAFKA_SASL_MECHANISM_PLAINTEXT_PROTOCOL=SCRAM-SHA-512
- KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=SCRAM-SHA-512
- KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
healthcheck:
test: [ "CMD", "nc", "-z", "kafka.cloud", "9092" ]
interval: 10s
retries: 5
start_period: 20s
timeout: 10s
configs:
- source: "kafka_server_jaas.conf"
target: /etc/kafka/kafka_server_jaas.conf
command:
- /bin/bash
- -c
- |
/etc/confluent/docker/configure
kafka-storage format --config /etc/kafka/kafka.properties --cluster-id YONsFVF4e6Jm0dNNxf9PP0 \
--add-scram 'SCRAM-SHA-512=[name=admin,password=admin-password]' \
--add-scram 'SCRAM-SHA-512=[name=client,password=client-password]' \
--ignore-formatted
/etc/confluent/docker/run
configs:
kafka_server_jaas.conf:
content: |
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-password";
};
test.py:
content: |
import os
import confluent_kafka, confluent_kafka.admin
print("confluent_kafka.version:", confluent_kafka.version())
print("confluent_kafka.libversion:", confluent_kafka.libversion())
cfg = {
"bootstrap.servers": "kafka.cloud:9092",
"sasl.mechanism": "SCRAM-SHA-512",
"security.protocol": "SASL_PLAINTEXT",
"sasl.username": "client",
"sasl.password": "client-password",
}
if os.environ["DEBUG_LEVEL"]:
cfg['debug'] = os.environ["DEBUG_LEVEL"]
kafka_admin = confluent_kafka.admin.AdminClient(cfg)
# create topics
for topic_name, future in kafka_admin.create_topics(
new_topics=[confluent_kafka.admin.NewTopic("topic1", num_partitions=1)]
).items():
try:
future.result()
except:
pass
# create consumer
consumer = confluent_kafka.Consumer(
cfg.copy()
| {
"group.id": "MyGroup",
"enable.auto.commit": False,
}
)
topic_partition = confluent_kafka.TopicPartition(topic="topic1", partition=0)
low_offset, high_offset = consumer.get_watermark_offsets(topic_partition)
assert low_offset == 0
assert high_offset == 0
Run
docker compose -f ./docker-compose.yaml run --build test
docker compose -f ./docker-compose.yaml run --build test
docker compose -f ./docker-compose.yaml run --build --env "DEBUG_LEVEL=broker,security" test > security.log 2>&1
docker compose -f ./docker-compose.yaml run --build --env "DEBUG_LEVEL=all" test > all.log 2>&1
docker compose -f ./docker-compose.yaml down
After the second "run", you should see the log message:
%3|1720719719.448|FAIL|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: SASL authentication error: SaslAuthenticateRequest failed: Local: Broker transport failure (after 0ms in state DOWN)
See security.log and all.log
Checklist
Please provide the following information:
- [x] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()): confluent_kafka.version: ('2.4.0', 33816576) confluent_kafka.libversion: ('2.4.0', 33816831) - [x] Apache Kafka broker version: 7.6.1 ?
- [x] Client configuration:
{...}see code - [x] Operating system: docker / alpine
- [x] Provide client logs (with
'debug': '..'as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue