easegress
easegress copied to clipboard
MQTT Pipeline Develop Guide
Currently this feature is working in process and can be seen on megaease/easegress:new-pipeline branch.
Pipeline Design
Spec:
name: pipeline-demo
kind: Pipeline
# only support MQTT now, for http, use HTTPPipeline
protocol: MQTT
flow:
- filter: mqtt-filter1
- filter: mqtt-filter2
filters:
- name: mqtt-filter1
kind: MockMQTTFilter1
- name: mqtt-filter2
kind: MockMQTTFilter2
This spec will create a pipeline with name pipeline-demo
which contains two filters mqtt-filter1
and mqtt-filter2
.
In mqtt spec yaml we add two more field:
name: mqttproxy
kind: MQTTProxy
...
pipeline: pipeline-demo
authByPipeline: true
authByPipeline
set to true means MQTTProxy will not check MQTT client username and password, but put MQTT connect packet into pipeline, make sure in this case you have at least one custom filter handle authorization.
To use this pipeline, call function GetPipeline(name, protocol) (*Pipeline, error)
in pkg/object/pipeline
. (This function is used for develop new proxy, in MQTTProxy
we call this function to get pipeline)
For MQTTProxy
, we use pipeline in two place,
- before mqtt client connect to server, so we can do connection management.
- before mqtt client publish message to backend kafka, so we can do publish management.
MQTTContext
MQTTProxy
create MQTTContext
and put it into Pipeline
, and Pipeline
use this context to run all its filters.
MQTTContext
:
-
Client()
returnMQTTClient
- contains
ClientID
andUserName
about mqtt client
- contains
-
PacketType()
return mqtt packet type inMQTTContext
- currently only three types of packet can be seen,
Connect
,Publish
andDisconnect
. - Use method
ConnectPacket()
,PublishPacket()
,DisconnectPacket()
to get them and check their information, like topic name, payload...
- currently only three types of packet can be seen,
-
SetDrop()
andDrop()
controls if this packet should be dropped.- If
Drop()
return true, this packet will not be send to backend Kafka.
- If
-
SetDisconnect()
andDisconnect()
controls if we should disconnect the mqtt client. -
SetEarlyStop()
andEarlyStop()
is used to control filter flow- if
EarlyStop()
return true, pipeline will stop exec of filters and return.
- if
Filter
Interface about Filter
is in file pkg/object/pipeline/registry.go
A example filter will be ConnectControl
in pkg/filter/connectcontrol
. Do check filterSpec.Protocol()
to make sure if filter support corresponding protocol.
HTTPFilter
contain method HandleHTTP
, MQTTFilter
contain method HandleMQTT
.
TestPipeline
in pkg/object/mqttproxy/mqtt_test.go
show a basic interaction between MQTTProxy and pipeline. Other test in pkg/object/pipeline/pipeline_test.go
and pkg/filter/connectcontrol/connectcontrol_test.go
may provide useful tests to develop more filters.
We add a key-value map to MQTTClient
in MQTTContext
, so that we can share information between filters during whole connection. (It will be stored in MQTTClient, not MQTTContext, so it can be used during whole connection, will not disappear when MQTTContext return).
Method include:
Load(key interface{}) (value interface{}, ok bool)
, Store(key interface{}, value interface{})
, Delete(key interface{})
.
We add Backend()
to MQTTContext
which has method Publish(target string, data []byte, headers map[string]string) error
allow filters to send message to backend server.
New yaml fields for connection management:
# max allowed connected clients on a single MQTT Proxy
maxAllowedConnection: 10000
connectionLimit:
rate: 10 # max allowed connection in time period
burst: 1000 # max allowed bytes in connect packet in time period
timePeriod: 10 # means 10 seconds, default 1 second
clientPublishLimit:
# similar to connectionLimit, rate limit for client to publish packets
rate: 10
burst: 1000
timePeriod: 10
APIs for session management:
path: apis/v1/mqttproxy/{name}/session/query?page=1&page_size=20&q=/topic
{name} is name of mqttproxy
method: get
resp body:
{"sessions":
[
{"sessionID":"2","topic":"/topic/123"},
{"sessionID":"5","topic":"/topic/456"},
{"sessionID":"9","topic":"/topic/789"}
]
}
this api will return sessions in the page which contain topic match q in query
to get all sessions in that page, provide empty q, for example "apis/v1/mqttproxy/{name}/session/query?page=1&page_size=20&q="
path: apis/v1/mqttproxy/{name}/session/delete
{name} is name of mqttproxy
method: post
request body:
{"sessions":
[
{"sessionID":"2"},
{"sessionID":"5"},
{"sessionID":"9"}
]
}this request will delete session with name "clientA" and "clientB" and disconnect their client.