aiokafka
aiokafka copied to clipboard
Can't connect to kafka docker
Here is my Docker for Kafka
version: "3" services: kafka-0: image: docker.io/bitnami/kafka:${KAFKA_VERSION} hostname: kafka-0 container_name: kafka-0 extra_hosts: - "host.docker.internal:host-gateway"
ports:
- "9094:9094"
environment:
- KAFKA_BROKER_ID=0
- KAFKA_ENABLE_KRAFT=yes
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092,EXTERNAL://host.docker.internal:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- ALLOW_PLAINTEXT_LISTENER=yes`
here is my code
`
import asyncio from aiokafka.admin import AIOKafkaAdminClient from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
async def test():
bootstrap_servers = "192.168.100.9:9094"
print(1)
sub = AIOKafkaConsumer(
bootstrap_servers=bootstrap_servers,
)
print(2)
sub.subscribe(["MANAGER_11111111-1111-1111-1111-000000000000"])
print(3)
await sub.start()
print(4)
while True:
print(5)
msg = await sub.getone()
print(msg)
if msg:
print(msg.error(), msg.value())
else:
break
await sub.stop()
asyncio.run(test())
`
192.168.100.9 my IP
and get error
Enable connect to node with id 0: [Errno -2] Name or service not known Traceback (most recent call last): File "/home/vladimir/work/sxipher/kafka_magik/test.py", line 26, in <module> asyncio.run(test()) File "/home/vladimir/anaconda3/lib/python3.11/asyncio/runners.py", line 190, in run return runner.run(main) ^^^^^^^^^^^^^^^^ File "/home/vladimir/anaconda3/lib/python3.11/asyncio/runners.py", line 118, in run return self._loop.run_until_complete(task) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/vladimir/anaconda3/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete return future.result() ^^^^^^^^^^^^^^^ File "/home/vladimir/work/sxipher/kafka_magik/test.py", line 14, in test await sub.start() File "/home/vladimir/anaconda3/lib/python3.11/site-packages/aiokafka/consumer/consumer.py", line 356, in start await self._client.bootstrap() File "/home/vladimir/anaconda3/lib/python3.11/site-packages/aiokafka/client.py", line 254, in bootstrap self._api_version = await self.check_version() ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/vladimir/anaconda3/lib/python3.11/site-packages/aiokafka/client.py", line 547, in check_version raise KafkaConnectionError( aiokafka.errors.KafkaConnectionError: KafkaConnectionError: No connection to node with id 0 Unclosed AIOKafkaConsumer consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7f5b25d0ce90>
Kafka protocol are a two steps thing :
- connecting to "bootstrap servers" to discover a cluster (brokers, topics, partitions, ...)
- connecting to the actual brokers to put/get data
Here from what you share, you use 192.168.100.9:9094 as bootstrap server, but after that, it will try to use host.docker.internal:9094 to get/put data (it is called "advertised listener" for a broker, how they advertise themselves in the cluster). Are both this address/name accessible from your host ?