confluent-kafka-python
confluent-kafka-python copied to clipboard
_MSG_TIMED_OUT while sending messages to a docker confluent-kafka broker in AWS from my laptop
Description
I have receiving the following error while sending messages to a docker confluent-kafka broker (AWS HOST) from my local laptop (Jupyter Notebook).
Message delivery failed: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
The Topic has been created in the Control Center but no messages inside it.
How to reproduce
Run following code inside a Jupyter Notebook in my laptop
from confluent_kafka import Producer
import socket
conf = {
'bootstrap.servers': '<aws-public-ip>:9092',
'client.id': socket.gethostname(),
"queue.buffering.max.ms": 100,
"retries": 10000000,
"socket.timeout.ms": 500,
"default.topic.config": {
"acks": "all",
"message.timeout.ms": 5000,
}
}
p = None
try:
p = Producer(conf)
print('Producer Created')
except Exception as ex:
print("Unable to create Producer instance")
exit()
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
jsonString1 = """ {"name":"Gal", "email":"[email protected]", "salary": "8345.55"} """
jsonString2 = """ {"name":"Dwayne", "email":"[email protected]", "salary": "7345.75"} """
jsonString3 = """ {"name":"Momoa", "email":"[email protected]", "salary": "3345.25"} """
some_data_source = [jsonString1, jsonString2, jsonString3]
for data in some_data_source:
p.poll(0)
print('Message sending...')
p.produce('a.jupyter1', data.encode('utf-8'), callback=delivery_report)
p.flush()
Output is
Producer Created
Message sending...
Message sending...
Message sending...
Message delivery failed: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Message delivery failed: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Message delivery failed: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
I can see the Topic name a.jupyter1
has been created in the Confluent Control Center
but no messages in it.
docker-compose.yml
file in the AWS HOST broker section looks like following:
KAFKA_LISTENERS: INTERNAL_HOST://0.0.0.0:9092,INTERNAL_DOCKER://0.0.0.0:29092,EXTERNAL_HOST://0.0.0.0:29093
KAFKA_ADVERTISED_LISTENERS: INTERNAL_HOST://localhost:9092,INTERNAL_DOCKER://broker:29092,EXTERNAL_HOST://<aws-public-ip>:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_HOST:PLAINTEXT,INTERNAL_DOCKER:PLAINTEXT,EXTERNAL_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL_DOCKER
When I use port 9092 in following code block then at least the Topic name is being created but when use 29093 then no even the Topic name creation failed.
conf = {
'bootstrap.servers': '<aws-public-ip>:9092',
...
I have followed this link for reference.
Checklist
Please provide the following information:
- [x] confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
): ('1.9.0', 17367040) - [x] Apache Kafka broker version: confluentinc/cp-server:7.1.1
- [ ] Client configuration:
{...}
- [x] Operating system: Cent OS
- [ ] Provide client logs (with
'debug': '..'
as necessary) - [ ] Provide broker log excerpts
- [x] Critical issue