eventing
eventing copied to clipboard
Prototype a MQTT Source
Problem
We do not have a MQTT Source
in Knative Eventing, for transforming messages from MQTT brokers to Cloudevents and ingest them via HTTP into the Knative Eventing system.
With "edge" / "iot" computing there is a growing number of "devices" that emit MQTT message to brokers. In order to process those message in Knative Eventing, we need a Source to read from those MQTT brokers
POC Implementation
Instead of directly creating a fully fledge controller, we can go a much simple approach and create an image, that can be used inside the ContainerSource
, like we did for the websockets protocol. See:
- https://github.com/knative/eventing/tree/main/cmd/websocketsource
- https://github.com/knative/eventing/blob/main/config/tools/websocket-source/websocket-source.yaml#L24
This would give us a quick win, and a good starting point for developers in the need to transform MQTT messages to cloudevents, into the Knative eventing sytstem
Persona: Developers
Exit Criteria
A binary/image that can be used via the Containersource
CRD
Time Estimate (optional): 2-3
Additional context (optional)
- https://github.com/cloudevents/sdk-go/tree/main/protocol/mqtt_paho/v2
- https://github.com/cloudevents/sdk-go/tree/main/samples/mqtt
I really love the idea and this is definitely a start in the right direction. I guess such a ContainerSource
could also be used in a SinkBinding
(i'm always confused when to a use ContainerSource and when a SinkBinding 😬 ). A next step then could be a fully fledged MqttSource
as this has the advantage of a typed configuration for e.g. the connection parameters.
I would like to work and contribute on this.
/assign
@g1rjeevan sounds good! Feel free to join our slack instance for further discussions
hey @g1rjeevan, are you still on the ticket..........I want to give it a try
Are there any updates on this issue @g1rjeevan? Please let me know if you are still working on it and still need more time within the next 24 hours, otherwise we will give @prakrit55 a chance to try! Thanks :)
/unassign @prakrit55 Do you want to give it a shot?
Hey @Leo6Leo, thanks for mentioning. Yes, it wd love to work on it.
/assign @prakrit55
@prakrit55 are there any questions about this Issue?
Hey @Leo6Leo, sorry for the delay. Where could I find the main for containersource
??
Hey @prakrit55, thanks for working on this issue.
Our final goal is to create a containersource that can emit the MQTT event. Something similar to this, see how the heartbeats source is implemented.
So the code should be in a package under cmd/mqttsource
and the ContainerSource YAML should be under config/tools/mqttsource
.
The final goal should be:
- Configure mqttsource in namespace
with a ConfigMap - Apply the ContainerSource YAML
ko apply -n <x> -f config/tools/mqttsource
- And hope it will work
hey @Leo6Leo, should we configure it with only http or https with tls ??
Hey @prakrit55, follow up upon our conversation on slack regarding this question, I have consulted with tech lead @pierDipi , and do you want to give a try to both http and https? :))) Please let us know if you have any questions!
Here is a code snippet that might be helpful to you when you are working on the TLS part. https://github.com/knative/eventing/blob/0a6d5b4520bc884076ca5c824fc31812c4ad57d6/cmd/heartbeats/main.go#L138-L158
/assign
hey @Leo6Leo this issue seems pretty interesting for me and I want to give a go.
I am assigning this to myself.
/assign
This issue is stale because it has been open for 90 days with no
activity. It will automatically close after 30 more days of
inactivity. Reopen the issue with /reopen
. Mark the issue as
fresh by adding the comment /remove-lifecycle stale
.
/remove-lifecycle stale
Hi! I'm interested in working on this issue if it's not worked on now.
Welcome @ctmphuongg,
The other PR related to this seems to be closed. So feel free to go ahead and /assign
it to you in case you want to pick this
/assign
@ctmphuongg As you described the process that you tested cloud-event's sample MQTT receiver:
Copy the file main.go in folder receiver, installing all packages
- Start a MQTT broker as they mentioned in the README: docker run -it --rm --name mosquitto -p 1883:1883 eclipse-mosquitto:2.0 mosquitto -c /mosquitto-no-auth.conf
- Keep the mqttsource running by go build the file and run ./main
- Send a message to the same topic, using mosquitto_pub -t “test-topic” -m “test”
- The thing is I don’t receive any message in the terminal where I started the mqttsource, but when using 5 mosquitto_sub to the same topic “test-topic”, it still received all the messages. So, I’m not sure if that sample there is working or not
The reason why there is no message is received is because you are passing the data as string, without specify any cloudevent properties.
According to CloudEvent Spec for MQTT,
For MQTT 5.0, the MQTT PUBLISH message's Content Type property MUST be set to the media type of an event format. For MQTT 3.1.1, the media type of the JSON event format is always implied.
And according to the example here indicated:
------------------ PUBLISH -------------------
Topic Name: mytopic
Content Type: application/cloudevents+json; charset=utf-8
------------------ payload -------------------
{
"specversion" : "1.0",
"type" : "com.example.someevent",
"time" : 2018-04-05T03:56;24Z,
"id" : 1234-1234-1234,
"source" : "/mycontext/subcontext",
"datacontenttype" : "application/json; charset=utf-8",
... further attributes omitted ...
"data" : {
... application data ...
}
}
-----------------------------------------------
you will need to carefully craft the message (payload) of your data, otherwise cloudevent go-sdk will fail the event validation, and will not invoke the Invoker function.
To conclude, the correct mosquitto_pub command should be:
mosquitto_pub -t 'test/topic' -m '{"specversion" : "1.0","type" :"com.example.someevent","id" : "1234-1234-1234","source" : "/mycontext/subcontext","data":{"msg":"hello world!"}}' -D PUBLISH user-property Content-Type application/cloudevents+json; charset=utf-8
Hope this helps, and please let me know if you have any questions!