eventing icon indicating copy to clipboard operation
eventing copied to clipboard

Prototype a MQTT Source

Open matzew opened this issue 1 year ago • 23 comments

Problem

We do not have a MQTT Source in Knative Eventing, for transforming messages from MQTT brokers to Cloudevents and ingest them via HTTP into the Knative Eventing system.

With "edge" / "iot" computing there is a growing number of "devices" that emit MQTT message to brokers. In order to process those message in Knative Eventing, we need a Source to read from those MQTT brokers

POC Implementation

Instead of directly creating a fully fledge controller, we can go a much simple approach and create an image, that can be used inside the ContainerSource, like we did for the websockets protocol. See:

  • https://github.com/knative/eventing/tree/main/cmd/websocketsource
  • https://github.com/knative/eventing/blob/main/config/tools/websocket-source/websocket-source.yaml#L24

This would give us a quick win, and a good starting point for developers in the need to transform MQTT messages to cloudevents, into the Knative eventing sytstem

Persona: Developers

Exit Criteria A binary/image that can be used via the Containersource CRD

Time Estimate (optional): 2-3

Additional context (optional)

  • https://github.com/cloudevents/sdk-go/tree/main/protocol/mqtt_paho/v2
  • https://github.com/cloudevents/sdk-go/tree/main/samples/mqtt

matzew avatar Aug 25 '23 06:08 matzew

I really love the idea and this is definitely a start in the right direction. I guess such a ContainerSource could also be used in a SinkBinding (i'm always confused when to a use ContainerSource and when a SinkBinding 😬 ). A next step then could be a fully fledged MqttSource as this has the advantage of a typed configuration for e.g. the connection parameters.

rhuss avatar Aug 25 '23 07:08 rhuss

I would like to work and contribute on this.

g1rjeevan avatar Aug 25 '23 14:08 g1rjeevan

/assign

g1rjeevan avatar Aug 25 '23 14:08 g1rjeevan

@g1rjeevan sounds good! Feel free to join our slack instance for further discussions

matzew avatar Aug 28 '23 09:08 matzew

hey @g1rjeevan, are you still on the ticket..........I want to give it a try

prakrit55 avatar Sep 15 '23 15:09 prakrit55

Are there any updates on this issue @g1rjeevan? Please let me know if you are still working on it and still need more time within the next 24 hours, otherwise we will give @prakrit55 a chance to try! Thanks :)

Leo6Leo avatar Oct 30 '23 20:10 Leo6Leo

/unassign @prakrit55 Do you want to give it a shot?

Leo6Leo avatar Nov 02 '23 12:11 Leo6Leo

Hey @Leo6Leo, thanks for mentioning. Yes, it wd love to work on it.

prakrit55 avatar Nov 02 '23 13:11 prakrit55

/assign @prakrit55

Leo6Leo avatar Nov 02 '23 13:11 Leo6Leo

@prakrit55 are there any questions about this Issue?

Leo6Leo avatar Nov 13 '23 14:11 Leo6Leo

Hey @Leo6Leo, sorry for the delay. Where could I find the main for containersource ??

prakrit55 avatar Nov 18 '23 02:11 prakrit55

Hey @prakrit55, thanks for working on this issue.

Our final goal is to create a containersource that can emit the MQTT event. Something similar to this, see how the heartbeats source is implemented.

So the code should be in a package under cmd/mqttsource and the ContainerSource YAML should be under config/tools/mqttsource.

The final goal should be:

  • Configure mqttsource in namespace with a ConfigMap
  • Apply the ContainerSource YAML ko apply -n <x> -f config/tools/mqttsource
  • And hope it will work

Leo6Leo avatar Nov 22 '23 19:11 Leo6Leo

hey @Leo6Leo, should we configure it with only http or https with tls ??

prakrit55 avatar Nov 25 '23 07:11 prakrit55

Hey @prakrit55, follow up upon our conversation on slack regarding this question, I have consulted with tech lead @pierDipi , and do you want to give a try to both http and https? :))) Please let us know if you have any questions!

Here is a code snippet that might be helpful to you when you are working on the TLS part. https://github.com/knative/eventing/blob/0a6d5b4520bc884076ca5c824fc31812c4ad57d6/cmd/heartbeats/main.go#L138-L158

Leo6Leo avatar Nov 28 '23 14:11 Leo6Leo

/assign

octonawish-akcodes avatar Jan 15 '24 17:01 octonawish-akcodes

hey @Leo6Leo this issue seems pretty interesting for me and I want to give a go.

I am assigning this to myself.

Rahul-Kumar-prog avatar Jan 18 '24 15:01 Rahul-Kumar-prog

/assign

Rahul-Kumar-prog avatar Jan 18 '24 15:01 Rahul-Kumar-prog

This issue is stale because it has been open for 90 days with no activity. It will automatically close after 30 more days of inactivity. Reopen the issue with /reopen. Mark the issue as fresh by adding the comment /remove-lifecycle stale.

github-actions[bot] avatar Apr 18 '24 01:04 github-actions[bot]

/remove-lifecycle stale

creydr avatar Apr 18 '24 10:04 creydr

Hi! I'm interested in working on this issue if it's not worked on now.

ctmphuongg avatar May 06 '24 15:05 ctmphuongg

Welcome @ctmphuongg, The other PR related to this seems to be closed. So feel free to go ahead and /assign it to you in case you want to pick this

creydr avatar May 06 '24 15:05 creydr

/assign

ctmphuongg avatar May 06 '24 17:05 ctmphuongg

@ctmphuongg As you described the process that you tested cloud-event's sample MQTT receiver:

Copy the file main.go in folder receiver, installing all packages

  1. Start a MQTT broker as they mentioned in the README: docker run -it --rm --name mosquitto -p 1883:1883 eclipse-mosquitto:2.0 mosquitto -c /mosquitto-no-auth.conf
  2. Keep the mqttsource running by go build the file and run ./main
  3. Send a message to the same topic, using mosquitto_pub -t “test-topic” -m “test”
  4. The thing is I don’t receive any message in the terminal where I started the mqttsource, but when using 5 mosquitto_sub to the same topic “test-topic”, it still received all the messages. So, I’m not sure if that sample there is working or not

The reason why there is no message is received is because you are passing the data as string, without specify any cloudevent properties.

According to CloudEvent Spec for MQTT,

For MQTT 5.0, the MQTT PUBLISH message's Content Type property MUST be set to the media type of an event format. For MQTT 3.1.1, the media type of the JSON event format is always implied.

And according to the example here indicated:

------------------ PUBLISH -------------------

Topic Name: mytopic
Content Type: application/cloudevents+json; charset=utf-8

------------------ payload -------------------

{
    "specversion" : "1.0",
    "type" : "com.example.someevent",
	"time" : 2018-04-05T03:56;24Z,
	"id" : 1234-1234-1234,
	"source" : "/mycontext/subcontext",
	"datacontenttype" : "application/json; charset=utf-8",

    ... further attributes omitted ...

    "data" : {
        ... application data ...
    }
}

-----------------------------------------------

you will need to carefully craft the message (payload) of your data, otherwise cloudevent go-sdk will fail the event validation, and will not invoke the Invoker function.

To conclude, the correct mosquitto_pub command should be:

mosquitto_pub -t 'test/topic' -m '{"specversion" : "1.0","type" :"com.example.someevent","id" : "1234-1234-1234","source" : "/mycontext/subcontext","data":{"msg":"hello world!"}}' -D PUBLISH user-property Content-Type application/cloudevents+json; charset=utf-8

Hope this helps, and please let me know if you have any questions!

Leo6Leo avatar May 14 '24 12:05 Leo6Leo