camel-kafka-connector icon indicating copy to clipboard operation
camel-kafka-connector copied to clipboard

CamelMqttsourceSourceConnector: connect timed out ERROR

Open GrSof opened this issue 3 years ago • 0 comments

Hello,

I want to build the mqtt source connector in a kafka cluster deployed using STRIMZI but, no matter what I try, I keep getting this error:

state: FAILED
trace: **"org.apache.kafka.connect.errors.ConnectException: Failed to create**
and start Camel context\n\tat org.apache.camel.kafkaconnector.CamelSourceTask.start(CamelSourceTask.java:185)\n\tat
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:224)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)\n\tat
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat
java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.camel.RuntimeCamelException:
MqttException (0) - java.net.SocketTimeoutException: connect timed out\n\tat
org.apache.camel.RuntimeCamelException.wrapRuntimeException(RuntimeCamelException.java:66)\n\tat
org.apache.camel.support.service.BaseService.doFail(BaseService.java:413)\n\tat
org.apache.camel.support.service.BaseService.fail(BaseService.java:342)\n\tat
org.apache.camel.support.service.BaseService.start(BaseService.java:132)\n\tat
org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:113)\n\tat
org.apache.camel.impl.engine.AbstractCamelContext.startService(AbstractCamelContext.java:3597)\n\tat
org.apache.camel.impl.engine.InternalRouteStartupManager.doStartOrResumeRouteConsumers(InternalRouteStartupManager.java:401)\n\tat
org.apache.camel.impl.engine.InternalRouteStartupManager.doStartRouteConsumers(InternalRouteStartupManager.java:319)\n\tat
org.apache.camel.impl.engine.InternalRouteStartupManager.safelyStartRouteServices(InternalRouteStartupManager.java:213)\n\tat
org.apache.camel.impl.engine.InternalRouteStartupManager.doStartOrResumeRoutes(InternalRouteStartupManager.java:147)\n\tat
org.apache.camel.impl.engine.AbstractCamelContext.doStartCamel(AbstractCamelContext.java:3299)\n\tat
org.apache.camel.impl.engine.AbstractCamelContext.doStartContext(AbstractCamelContext.java:2951)\n\tat
org.apache.camel.impl.engine.AbstractCamelContext.doStart(AbstractCamelContext.java:2902)\n\tat
org.apache.camel.support.service.BaseService.start(BaseService.java:119)\n\tat
org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2586)\n\tat
org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:247)\n\tat
org.apache.camel.main.SimpleMain.doStart(SimpleMain.java:43)\n\tat org.apache.camel.support.service.BaseService.start(BaseService.java:119)\n\tat
org.apache.camel.kafkaconnector.CamelSourceTask.start(CamelSourceTask.java:181)\n\t...
**8 more\nCaused by: MqttException (0) - java.net.SocketTimeoutException:
connect timed out**\n\tat org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:38)\n\tat
org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:738)\n\t...
1 more\nCaused by: java.net.SocketTimeoutException: connect timed out\n\tat
java.base/java.net.PlainSocketImpl.socketConnect(Native Method)\n\tat java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)\n\tat
java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)\n\tat
java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)\n\tat
java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)\n\tat
java.base/java.net.Socket.connect(Socket.java:609)\n\tat org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:74)\n\tat
org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:724)\n\t...
1 more\n"

If I try to reach the mosquitto broker from the machine, everything goes fine

>>>nc -zv [mosquitto.org](http://mosquitto.org/) 1883
Connection to [mosquitto.org](http://mosquitto.org/) 1883 port [tcp/*] succeeded!

So is not a firewall or proxy problem. I must say that I'm using proxy socks to connect to the public mosquitto broker because my cluster's machines are in a private network. But the connection has been configured in the iptables and socks.conf file to be something transparent to the cluster, in theory. Is it possible that this has something to do with the error I'm getting? Do I need to configure something in the Connector's yaml file to use proxy socks? Maybe it has something to do with the bootstrap servers parameter?

Yaml files are:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 2.8.0
  image: registry/kafka-mqtt-0.27.0-2.8.0:latest
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap.my-kafka-project.svc:9092
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
  template:
    pod:
      imagePullSecrets:
        - name: dockersecret1
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: camel-mqtt-source-kafka-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.camel.kafkaconnector.mqttsource.CamelMqttsourceSourceConnector
  tasksMax: 1
  config:
    #key.converter: org.apache.kafka.connect.storage.StringConverter
    #value.converter: org.apache.kafka.connect.storage.StringConverter
    topics: my-topic
    camel.kamelet.mqtt-source.topic: mqtt/AirQuality
    camel.kamelet.mqtt-source.brokerUrl: tcp://mosquitto.org:1883

I'm using Kafka version 2.8.0 as it is said in the apache camel web page and Strimzi version 0.27.0-2.8.0.

I would really appreciate some feedback from your side because I don't know what else to try. I can provide with more information if needed.

Thanks

GrSof avatar Sep 07 '22 08:09 GrSof