aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

Can't connect to kafka docker

Open ghost opened this issue 1 year ago • 1 comments

Here is my Docker for Kafka

version: "3" services: kafka-0: image: docker.io/bitnami/kafka:${KAFKA_VERSION} hostname: kafka-0 container_name: kafka-0 extra_hosts: - "host.docker.internal:host-gateway"

ports:
  - "9094:9094"
environment:
  - KAFKA_BROKER_ID=0
  - KAFKA_ENABLE_KRAFT=yes
  # KRaft settings
  - KAFKA_CFG_NODE_ID=0
  - KAFKA_CFG_PROCESS_ROLES=controller,broker
  - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093
  - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
  # Listeners
  - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
  - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092,EXTERNAL://host.docker.internal:9094
  - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
  - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
  - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
  - ALLOW_PLAINTEXT_LISTENER=yes`

here is my code

`

import asyncio from aiokafka.admin import AIOKafkaAdminClient from aiokafka import AIOKafkaConsumer, AIOKafkaProducer

async def test(): bootstrap_servers = "192.168.100.9:9094" print(1) sub = AIOKafkaConsumer( bootstrap_servers=bootstrap_servers, )
print(2) sub.subscribe(["MANAGER_11111111-1111-1111-1111-000000000000"]) print(3) await sub.start() print(4) while True: print(5) msg = await sub.getone() print(msg) if msg: print(msg.error(), msg.value()) else: break await sub.stop()

asyncio.run(test())

`

192.168.100.9 my IP

and get error

Enable connect to node with id 0: [Errno -2] Name or service not known Traceback (most recent call last): File "/home/vladimir/work/sxipher/kafka_magik/test.py", line 26, in <module> asyncio.run(test()) File "/home/vladimir/anaconda3/lib/python3.11/asyncio/runners.py", line 190, in run return runner.run(main) ^^^^^^^^^^^^^^^^ File "/home/vladimir/anaconda3/lib/python3.11/asyncio/runners.py", line 118, in run return self._loop.run_until_complete(task) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/vladimir/anaconda3/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete return future.result() ^^^^^^^^^^^^^^^ File "/home/vladimir/work/sxipher/kafka_magik/test.py", line 14, in test await sub.start() File "/home/vladimir/anaconda3/lib/python3.11/site-packages/aiokafka/consumer/consumer.py", line 356, in start await self._client.bootstrap() File "/home/vladimir/anaconda3/lib/python3.11/site-packages/aiokafka/client.py", line 254, in bootstrap self._api_version = await self.check_version() ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/vladimir/anaconda3/lib/python3.11/site-packages/aiokafka/client.py", line 547, in check_version raise KafkaConnectionError( aiokafka.errors.KafkaConnectionError: KafkaConnectionError: No connection to node with id 0 Unclosed AIOKafkaConsumer consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7f5b25d0ce90>

ghost avatar Jan 23 '24 14:01 ghost

Kafka protocol are a two steps thing :

  • connecting to "bootstrap servers" to discover a cluster (brokers, topics, partitions, ...)
  • connecting to the actual brokers to put/get data

Here from what you share, you use 192.168.100.9:9094 as bootstrap server, but after that, it will try to use host.docker.internal:9094 to get/put data (it is called "advertised listener" for a broker, how they advertise themselves in the cluster). Are both this address/name accessible from your host ?

vmaurin avatar Jan 30 '24 20:01 vmaurin