kafka-connect-iot-mqtt-connector-example
kafka-connect-iot-mqtt-connector-example copied to clipboard
Can not configure MQTT Connector in Control Center
I'm struggling with the configuration as described in this tutorial of the mqtt connector via control center. It works fine via terminal, however, when trying to add it in control center it hangs until forever after pressing "Continue" (which is turning from violet to grey before":

I think the underlying issue is, that the selection of the mqtt connector class does not trigger the ui update to show the connector specific configuration options (as it does for all other connectors).
I'm using confluent-5.2.1 on an ubuntu server along with kafka-connect-mqtt:1.2.1-preview. Are these two versions supposed to be compatible with another?
Is there a more appropriate channel to report this problem?
Hi Holger,
thanks for sharing the issue.
I will ask colleagues from engineering to review and respond here.
In general, you can always use Confluent Slack Community (https://launchpass.com/confluentcommunity) which is free to use. You can ask any Kafka and Confluent questions there, and engineering also monitors the Slack channels.
Hi Kai,
I have configured kafka mqtt connect using source-anonymous.properties file. Configuration is given below.
name=mqtt-source tasks.max=1 connector.class=io.confluent.connect.mqtt.MqttSourceConnector mqtt.server.uri=tcp://ec2-3-215-42-148.compute-1.amazonaws.com:1883 mqtt.topics=mcd/11.22.33.44/DEVICE_PROFILE kafka.topics=DEVICE_PROFILE confluent.topic.bootstrap.servers= ec2-3-215-42-148.compute-1.amazonaws.com :9092 confluent.topic.replication.factor=1
After making this configuration I have started the connect by below command
confluent start connect.
I checked the connect log.
[2019-07-25 08:53:33,439] INFO Started o.e.j.s.ServletContextHandler@5f5c2451{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:855) [2019-07-25 08:53:33,439] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer:231) [2019-07-25 08:53:33,439] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:56) [2019-07-25 08:53:33,683] INFO Finished reading to end of log and updated config snapshot, new config log offset: 13 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:824) [2019-07-25 08:53:33,683] INFO Starting connectors and tasks using config offset 13 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:850) [2019-07-25 08:53:33,683] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:860)
ERROR: ERROR Found configuration for connector 'connector-mqtt-source' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,199] ERROR Found configuration for connector 'connector-mqtt-source' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,199] ERROR Found configuration for connector 'connector-mqtt-source' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,200] ERROR Found configuration for connector 'connector-mqtt-source' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,200] ERROR Found configuration for connector 'connector-mqtt-source-new' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,200] ERROR Found configuration for connector 'connector-mqtt-source' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,201] ERROR Found configuration for connector 'connector-mqtt-source' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,201] ERROR Found configuration for connector 'connector-mqtt-source-new' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,202] ERROR Found configuration for connector 'connector-mqtt-source-sourjya' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,202] ERROR Found configuration for connector 'connector-mqtt-source' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,202] ERROR Found configuration for connector 'connector-mqtt-source' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,203] ERROR Found configuration for connector 'connector-mqtt-source' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523)
Can you please go through the configuration I have given. Please let me know is there any mistake I did. I want to know one more thing if I run "confluent start connect" how do I know that confluent is taking source-anonymous.properties file instead of something else.
Your help will be greatful.
The exception clearly shows that the structure of your config is not correct.
Looks like White space in
confluent.topic.bootstrap.servers= ec2-3-215-42-148.compute-1.amazonaws.com :9092
maybe?
Also can you please attach the full log file. My colleague just said there should be a more clear error message.
Hi Kai,
I am getting below error : Kafka Mqtt Source Connector not reciveing data published from MQTT and failed with "java.lang.NullPointerException"
i created a connector with below confiiguration
curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
"name" : "mqtt-source",
"config" : {
"connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max" : "1",
"mqtt.server.uri" : "tcp://127.0.0.1:1883",
"mqtt.topics" : "temperature",
"kafka.topic" : "mqtt.",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license":""
}
}'
When I execute above examples and check connector status it show me running like :
curl http://localhost:8083/connectors/mqtt-source/status | python -m json.tool % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 185 100 185 0 0 10277 0 --:--:-- --:--:-- --:--:-- 10277 { "connector": { "state": "RUNNING", "worker_id": "127.0.1.1:8083" }, "name": "mqtt-source", "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "127.0.1.1:8083" } ], "type": "source" }
But when I publish data from Mqtt Topic it is not recieved at the kafka consumer topics.
Consumer:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mqtt. --from-beginning
Mqtt publish topic:
mosquitto_pub -h 127.0.0.1 -p 1883 -t temperature -q 2 -m "99999,2.10#"
I am getting below error when I check status of mqtt connector
{ "connector": { "state": "RUNNING", "worker_id": "127.0.1.1:8083" }, "name": "mqtt-source", "tasks": [ { "id": 0, "state": "FAILED", "trace": "java.lang.NullPointerException\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.convertHeaderFor(WorkerSourceTask.java:296)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\n", "worker_id": "127.0.1.1:8083" } ], "type": "source" }
Is there any issue with connector properties? or something else..?