camel-kafka-connector
camel-kafka-connector copied to clipboard
CamelMqttsourceSourceConnector: connect timed out ERROR
Hello,
I want to build the mqtt source connector in a kafka cluster deployed using STRIMZI but, no matter what I try, I keep getting this error:
state: FAILED
trace: **"org.apache.kafka.connect.errors.ConnectException: Failed to create**
and start Camel context\n\tat org.apache.camel.kafkaconnector.CamelSourceTask.start(CamelSourceTask.java:185)\n\tat
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:224)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)\n\tat
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat
java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.camel.RuntimeCamelException:
MqttException (0) - java.net.SocketTimeoutException: connect timed out\n\tat
org.apache.camel.RuntimeCamelException.wrapRuntimeException(RuntimeCamelException.java:66)\n\tat
org.apache.camel.support.service.BaseService.doFail(BaseService.java:413)\n\tat
org.apache.camel.support.service.BaseService.fail(BaseService.java:342)\n\tat
org.apache.camel.support.service.BaseService.start(BaseService.java:132)\n\tat
org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:113)\n\tat
org.apache.camel.impl.engine.AbstractCamelContext.startService(AbstractCamelContext.java:3597)\n\tat
org.apache.camel.impl.engine.InternalRouteStartupManager.doStartOrResumeRouteConsumers(InternalRouteStartupManager.java:401)\n\tat
org.apache.camel.impl.engine.InternalRouteStartupManager.doStartRouteConsumers(InternalRouteStartupManager.java:319)\n\tat
org.apache.camel.impl.engine.InternalRouteStartupManager.safelyStartRouteServices(InternalRouteStartupManager.java:213)\n\tat
org.apache.camel.impl.engine.InternalRouteStartupManager.doStartOrResumeRoutes(InternalRouteStartupManager.java:147)\n\tat
org.apache.camel.impl.engine.AbstractCamelContext.doStartCamel(AbstractCamelContext.java:3299)\n\tat
org.apache.camel.impl.engine.AbstractCamelContext.doStartContext(AbstractCamelContext.java:2951)\n\tat
org.apache.camel.impl.engine.AbstractCamelContext.doStart(AbstractCamelContext.java:2902)\n\tat
org.apache.camel.support.service.BaseService.start(BaseService.java:119)\n\tat
org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2586)\n\tat
org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:247)\n\tat
org.apache.camel.main.SimpleMain.doStart(SimpleMain.java:43)\n\tat org.apache.camel.support.service.BaseService.start(BaseService.java:119)\n\tat
org.apache.camel.kafkaconnector.CamelSourceTask.start(CamelSourceTask.java:181)\n\t...
**8 more\nCaused by: MqttException (0) - java.net.SocketTimeoutException:
connect timed out**\n\tat org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:38)\n\tat
org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:738)\n\t...
1 more\nCaused by: java.net.SocketTimeoutException: connect timed out\n\tat
java.base/java.net.PlainSocketImpl.socketConnect(Native Method)\n\tat java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)\n\tat
java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)\n\tat
java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)\n\tat
java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)\n\tat
java.base/java.net.Socket.connect(Socket.java:609)\n\tat org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:74)\n\tat
org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:724)\n\t...
1 more\n"
If I try to reach the mosquitto broker from the machine, everything goes fine
>>>nc -zv [mosquitto.org](http://mosquitto.org/) 1883
Connection to [mosquitto.org](http://mosquitto.org/) 1883 port [tcp/*] succeeded!
So is not a firewall or proxy problem. I must say that I'm using proxy socks to connect to the public mosquitto broker because my cluster's machines are in a private network. But the connection has been configured in the iptables and socks.conf file to be something transparent to the cluster, in theory. Is it possible that this has something to do with the error I'm getting? Do I need to configure something in the Connector's yaml file to use proxy socks? Maybe it has something to do with the bootstrap servers parameter?
Yaml files are:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 2.8.0
image: registry/kafka-mqtt-0.27.0-2.8.0:latest
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap.my-kafka-project.svc:9092
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
config.storage.replication.factor: 3
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
template:
pod:
imagePullSecrets:
- name: dockersecret1
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: camel-mqtt-source-kafka-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.camel.kafkaconnector.mqttsource.CamelMqttsourceSourceConnector
tasksMax: 1
config:
#key.converter: org.apache.kafka.connect.storage.StringConverter
#value.converter: org.apache.kafka.connect.storage.StringConverter
topics: my-topic
camel.kamelet.mqtt-source.topic: mqtt/AirQuality
camel.kamelet.mqtt-source.brokerUrl: tcp://mosquitto.org:1883
I'm using Kafka version 2.8.0 as it is said in the apache camel web page and Strimzi version 0.27.0-2.8.0.
I would really appreciate some feedback from your side because I don't know what else to try. I can provide with more information if needed.
Thanks