kafka-connect-iot-mqtt-connector-example icon indicating copy to clipboard operation
kafka-connect-iot-mqtt-connector-example copied to clipboard

Can not configure MQTT Connector in Control Center

Open holgerbrandl opened this issue 6 years ago • 5 comments
trafficstars

I'm struggling with the configuration as described in this tutorial of the mqtt connector via control center. It works fine via terminal, however, when trying to add it in control center it hangs until forever after pressing "Continue" (which is turning from violet to grey before":

image

I think the underlying issue is, that the selection of the mqtt connector class does not trigger the ui update to show the connector specific configuration options (as it does for all other connectors).

I'm using confluent-5.2.1 on an ubuntu server along with kafka-connect-mqtt:1.2.1-preview. Are these two versions supposed to be compatible with another?

Is there a more appropriate channel to report this problem?

holgerbrandl avatar Jun 11 '19 14:06 holgerbrandl

Hi Holger,

thanks for sharing the issue.

I will ask colleagues from engineering to review and respond here.

In general, you can always use Confluent Slack Community (https://launchpass.com/confluentcommunity) which is free to use. You can ask any Kafka and Confluent questions there, and engineering also monitors the Slack channels.

kaiwaehner avatar Jun 11 '19 14:06 kaiwaehner

Hi Kai,

I have configured kafka mqtt connect using source-anonymous.properties file. Configuration is given below.

name=mqtt-source tasks.max=1 connector.class=io.confluent.connect.mqtt.MqttSourceConnector mqtt.server.uri=tcp://ec2-3-215-42-148.compute-1.amazonaws.com:1883 mqtt.topics=mcd/11.22.33.44/DEVICE_PROFILE kafka.topics=DEVICE_PROFILE confluent.topic.bootstrap.servers= ec2-3-215-42-148.compute-1.amazonaws.com :9092 confluent.topic.replication.factor=1

After making this configuration I have started the connect by below command

confluent start connect.

I checked the connect log.

[2019-07-25 08:53:33,439] INFO Started o.e.j.s.ServletContextHandler@5f5c2451{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:855) [2019-07-25 08:53:33,439] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer:231) [2019-07-25 08:53:33,439] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:56) [2019-07-25 08:53:33,683] INFO Finished reading to end of log and updated config snapshot, new config log offset: 13 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:824) [2019-07-25 08:53:33,683] INFO Starting connectors and tasks using config offset 13 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:850) [2019-07-25 08:53:33,683] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:860)

ERROR: ERROR Found configuration for connector 'connector-mqtt-source' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,199] ERROR Found configuration for connector 'connector-mqtt-source' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,199] ERROR Found configuration for connector 'connector-mqtt-source' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,200] ERROR Found configuration for connector 'connector-mqtt-source' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,200] ERROR Found configuration for connector 'connector-mqtt-source-new' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,200] ERROR Found configuration for connector 'connector-mqtt-source' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,201] ERROR Found configuration for connector 'connector-mqtt-source' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,201] ERROR Found configuration for connector 'connector-mqtt-source-new' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,202] ERROR Found configuration for connector 'connector-mqtt-source-sourjya' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,202] ERROR Found configuration for connector 'connector-mqtt-source' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,202] ERROR Found configuration for connector 'connector-mqtt-source' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523) [2019-07-25 08:53:33,203] ERROR Found configuration for connector 'connector-mqtt-source' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:523)

Can you please go through the configuration I have given. Please let me know is there any mistake I did. I want to know one more thing if I run "confluent start connect" how do I know that confluent is taking source-anonymous.properties file instead of something else.

Your help will be greatful.

sourjya123 avatar Jul 25 '19 09:07 sourjya123

The exception clearly shows that the structure of your config is not correct.

Looks like White space in confluent.topic.bootstrap.servers= ec2-3-215-42-148.compute-1.amazonaws.com :9092 maybe?

kaiwaehner avatar Jul 25 '19 12:07 kaiwaehner

Also can you please attach the full log file. My colleague just said there should be a more clear error message.

kaiwaehner avatar Jul 25 '19 13:07 kaiwaehner

Hi Kai,

I am getting below error : Kafka Mqtt Source Connector not reciveing data published from MQTT and failed with "java.lang.NullPointerException"

i created a connector with below confiiguration

curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
    "name" : "mqtt-source",
"config" : {
    "connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
    "tasks.max" : "1",
    "mqtt.server.uri" : "tcp://127.0.0.1:1883",
    "mqtt.topics" : "temperature",
    "kafka.topic" : "mqtt.",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1",
    "confluent.license":""
    }
}'

When I execute above examples and check connector status it show me running like :

curl http://localhost:8083/connectors/mqtt-source/status | python -m json.tool % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 185 100 185 0 0 10277 0 --:--:-- --:--:-- --:--:-- 10277 { "connector": { "state": "RUNNING", "worker_id": "127.0.1.1:8083" }, "name": "mqtt-source", "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "127.0.1.1:8083" } ], "type": "source" }

But when I publish data from Mqtt Topic it is not recieved at the kafka consumer topics.

Consumer:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mqtt. --from-beginning

Mqtt publish topic:

mosquitto_pub -h 127.0.0.1 -p 1883 -t temperature -q 2 -m "99999,2.10#"

I am getting below error when I check status of mqtt connector

{ "connector": { "state": "RUNNING", "worker_id": "127.0.1.1:8083" }, "name": "mqtt-source", "tasks": [ { "id": 0, "state": "FAILED", "trace": "java.lang.NullPointerException\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.convertHeaderFor(WorkerSourceTask.java:296)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\n", "worker_id": "127.0.1.1:8083" } ], "type": "source" }

Is there any issue with connector properties? or something else..?

kcs-santoshahire avatar Sep 14 '20 10:09 kcs-santoshahire