easegress icon indicating copy to clipboard operation
easegress copied to clipboard

MQTT Pipeline Develop Guide

Open suchen-sci opened this issue 2 years ago • 3 comments

Currently this feature is working in process and can be seen on megaease/easegress:new-pipeline branch.

Pipeline Design

Spec:

name: pipeline-demo
kind: Pipeline
# only support MQTT now, for http, use HTTPPipeline
protocol: MQTT 
flow:
- filter: mqtt-filter1
- filter: mqtt-filter2
filters:
- name: mqtt-filter1
  kind: MockMQTTFilter1
- name: mqtt-filter2
  kind: MockMQTTFilter2

This spec will create a pipeline with name pipeline-demo which contains two filters mqtt-filter1 and mqtt-filter2.

In mqtt spec yaml we add two more field:

name: mqttproxy
kind: MQTTProxy
...
pipeline: pipeline-demo
authByPipeline: true

authByPipeline set to true means MQTTProxy will not check MQTT client username and password, but put MQTT connect packet into pipeline, make sure in this case you have at least one custom filter handle authorization.

To use this pipeline, call function GetPipeline(name, protocol) (*Pipeline, error) in pkg/object/pipeline. (This function is used for develop new proxy, in MQTTProxy we call this function to get pipeline)

For MQTTProxy, we use pipeline in two place,

  1. before mqtt client connect to server, so we can do connection management.
  2. before mqtt client publish message to backend kafka, so we can do publish management.

MQTTContext

MQTTProxy create MQTTContext and put it into Pipeline, and Pipeline use this context to run all its filters.

MQTTContext:

  • Client() return MQTTClient
    • contains ClientID and UserName about mqtt client
  • PacketType() return mqtt packet type in MQTTContext
    • currently only three types of packet can be seen, Connect, Publish and Disconnect.
    • Use method ConnectPacket(), PublishPacket(), DisconnectPacket() to get them and check their information, like topic name, payload...
  • SetDrop() and Drop() controls if this packet should be dropped.
    • If Drop() return true, this packet will not be send to backend Kafka.
  • SetDisconnect() and Disconnect() controls if we should disconnect the mqtt client.
  • SetEarlyStop() and EarlyStop() is used to control filter flow
    • if EarlyStop() return true, pipeline will stop exec of filters and return.

Filter

Interface about Filter is in file pkg/object/pipeline/registry.go A example filter will be ConnectControl in pkg/filter/connectcontrol. Do check filterSpec.Protocol() to make sure if filter support corresponding protocol.

HTTPFilter contain method HandleHTTP, MQTTFilter contain method HandleMQTT. TestPipeline in pkg/object/mqttproxy/mqtt_test.go show a basic interaction between MQTTProxy and pipeline. Other test in pkg/object/pipeline/pipeline_test.go and pkg/filter/connectcontrol/connectcontrol_test.go may provide useful tests to develop more filters.

suchen-sci avatar Nov 04 '21 07:11 suchen-sci

We add a key-value map to MQTTClient in MQTTContext, so that we can share information between filters during whole connection. (It will be stored in MQTTClient, not MQTTContext, so it can be used during whole connection, will not disappear when MQTTContext return).

Method include: Load(key interface{}) (value interface{}, ok bool), Store(key interface{}, value interface{}), Delete(key interface{}).

suchen-sci avatar Nov 15 '21 10:11 suchen-sci

We add Backend() to MQTTContext which has method Publish(target string, data []byte, headers map[string]string) error allow filters to send message to backend server.

suchen-sci avatar Nov 16 '21 08:11 suchen-sci

New yaml fields for connection management:

# max allowed connected clients on a single MQTT Proxy
maxAllowedConnection: 10000
connectionLimit:
  rate: 10  # max allowed connection in time period
  burst: 1000  # max allowed bytes in connect packet in time period  
  timePeriod: 10  # means 10 seconds, default 1 second
clientPublishLimit:
# similar to connectionLimit, rate limit for client to publish packets
  rate: 10
  burst: 1000
  timePeriod: 10

APIs for session management:

path: apis/v1/mqttproxy/{name}/session/query?page=1&page_size=20&q=/topic
{name} is name of mqttproxy
method: get
resp body: 
{"sessions":
  [
    {"sessionID":"2","topic":"/topic/123"},
    {"sessionID":"5","topic":"/topic/456"},
    {"sessionID":"9","topic":"/topic/789"}
  ]
}
this api will return sessions in the page which contain topic match q in query
to get all sessions in that page, provide empty q, for example "apis/v1/mqttproxy/{name}/session/query?page=1&page_size=20&q="

path: apis/v1/mqttproxy/{name}/session/delete
{name} is name of mqttproxy
method: post
request body: 
{"sessions":
  [
    {"sessionID":"2"},
    {"sessionID":"5"},
    {"sessionID":"9"}
  ]
}this request will delete session with name "clientA" and "clientB" and disconnect their client.

suchen-sci avatar Nov 26 '21 02:11 suchen-sci