confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

_MSG_TIMED_OUT while sending messages to a docker confluent-kafka broker in AWS from my laptop

Open yuviabhi opened this issue 1 year ago • 0 comments

Description

I have receiving the following error while sending messages to a docker confluent-kafka broker (AWS HOST) from my local laptop (Jupyter Notebook).

Message delivery failed: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}

The Topic has been created in the Control Center but no messages inside it.

How to reproduce

Run following code inside a Jupyter Notebook in my laptop

from confluent_kafka import Producer
import socket 
conf = {
        'bootstrap.servers': '<aws-public-ip>:9092',
        'client.id': socket.gethostname(),
        "queue.buffering.max.ms": 100,
        "retries": 10000000,
        "socket.timeout.ms": 500,
        "default.topic.config": {
            "acks": "all",
            "message.timeout.ms": 5000,
        }
    }
p = None
try:
    p = Producer(conf)
    print('Producer Created')
except Exception as ex:
    print("Unable to create Producer instance")
    exit()
def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
jsonString1 = """ {"name":"Gal", "email":"[email protected]", "salary": "8345.55"} """
jsonString2 = """ {"name":"Dwayne", "email":"[email protected]", "salary": "7345.75"} """
jsonString3 = """ {"name":"Momoa", "email":"[email protected]", "salary": "3345.25"} """
some_data_source = [jsonString1, jsonString2, jsonString3]
for data in some_data_source:
    p.poll(0)
    print('Message sending...')
    p.produce('a.jupyter1', data.encode('utf-8'), callback=delivery_report)
p.flush()

Output is

Producer Created
Message sending...
Message sending...
Message sending...
Message delivery failed: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Message delivery failed: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Message delivery failed: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}

I can see the Topic name a.jupyter1 has been created in the Confluent Control Center but no messages in it.

docker-compose.yml file in the AWS HOST broker section looks like following:

KAFKA_LISTENERS: INTERNAL_HOST://0.0.0.0:9092,INTERNAL_DOCKER://0.0.0.0:29092,EXTERNAL_HOST://0.0.0.0:29093      
KAFKA_ADVERTISED_LISTENERS: INTERNAL_HOST://localhost:9092,INTERNAL_DOCKER://broker:29092,EXTERNAL_HOST://<aws-public-ip>:29093      
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_HOST:PLAINTEXT,INTERNAL_DOCKER:PLAINTEXT,EXTERNAL_HOST:PLAINTEXT      
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL_DOCKER

When I use port 9092 in following code block then at least the Topic name is being created but when use 29093 then no even the Topic name creation failed.

conf = {
        'bootstrap.servers': '<aws-public-ip>:9092',
...

I have followed this link for reference.

Checklist

Please provide the following information:

  • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): ('1.9.0', 17367040)
  • [x] Apache Kafka broker version: confluentinc/cp-server:7.1.1
  • [ ] Client configuration: {...}
  • [x] Operating system: Cent OS
  • [ ] Provide client logs (with 'debug': '..' as necessary)
  • [ ] Provide broker log excerpts
  • [x] Critical issue

yuviabhi avatar Jul 07 '22 07:07 yuviabhi