lua-resty-rocketmq
lua-resty-rocketmq copied to clipboard
Name
lua-resty-rocketmq - Lua rocketmq client driver for the ngx_lua based on the cosocket API
Table of Contents
- Name
- Status
- Description
- Quick start
- Synopsis
- Modules
- resty.rocketmq.producer
- Methods
- new
- addRPCHook
- setUseTLS
- setTimeout
- registerSendMessageHook
- registerEndTransactionHook
- send
- sendMessageInTransaction
- batchSend
- start
- stop
- Methods
- resty.rocketmq.consumer
- Methods
- new
- addRPCHook
- setUseTLS
- setTimeout
- registerMessageListener
- registerConsumeMessageHook
- subscribe
- start
- stop
- setAllocateMessageQueueStrategy
- getAllocateMessageQueueStrategy
- setEnableMsgTrace
- isEnableMsgTrace
- setCustomizedTraceTopic
- getCustomizedTraceTopic
- setConsumeFromWhere
- getConsumeFromWhere
- setConsumeTimestamp
- getConsumeTimestamp
- setPullThresholdForQueue
- getPullThresholdForQueue
- setPullThresholdSizeForQueue
- getPullThresholdSizeForQueue
- setPullTimeDelayMillsWhenException
- getPullTimeDelayMillsWhenException
- setPullBatchSize
- getPullBatchSize
- setPullInterval
- getPullInterval
- setConsumeMessageBatchMaxSize
- getConsumeMessageBatchMaxSize
- setMaxReconsumeTimes
- getMaxReconsumeTimes
- Methods
- resty.rocketmq.admin
- Methods
- new
- addRPCHook
- setUseTLS
- setTimeout
- createTopic
- createTopicForBroker
- searchOffset
- maxOffset
- minOffset
- earliestMsgStoreTime
- viewMessage
- queryMessage
- queryTraceByMsgId
- Methods
- HTTP proxy
- Quick start
- SQS proxy
- Quick start
- resty.rocketmq.producer
- Installation
- See Also
Status
Production ready.
Description
This Lua library is a RocketMQ client driver for the ngx_lua nginx module:
This Lua library takes advantage of ngx_lua's cosocket API, which ensures 100% nonblocking behavior.
Quick start
Install OpenResty
for ubuntu:
wget -O - https://openresty.org/package/pubkey.gpg | sudo apt-key add -
echo "deb http://openresty.org/package/ubuntu $(lsb_release -sc) main" \
| sudo tee /etc/apt/sources.list.d/openresty.list
sudo apt-get update
sudo apt-get -y install openresty
see https://openresty.org/cn/linux-packages.html for more distributions
Install and start RocketMQ
wget https://archive.apache.org/dist/rocketmq/5.3.0/rocketmq-all-5.3.0-bin-release.zip
unzip rocketmq-all-5.3.0-bin-release.zip
cd rocketmq-5.3.0
nohup bash bin/mqnamesrv &
nohup bash bin/mqbroker -n localhost:9876 -c conf/broker.conf &
Run examples of this project
cd examples
chmod +x producer.lua
./producer.lua
Synopsis
lua_package_path "/path/to/lua-resty-rocketmq/lib/?.lua;;";
server {
location /test {
content_by_lua_block {
local cjson = require "cjson"
local producer = require "resty.rocketmq.producer"
local consumer = require "resty.rocketmq.consumer"
local nameservers = { "127.0.0.1:9876" }
local message = "halo world"
local p = producer.new(nameservers, "produce_group")
-- set acl
local aclHook = require("resty.rocketmq.acl_rpchook").new("RocketMQ","123456781")
p:addRPCHook(aclHook)
-- use tls mode
p:setUseTLS(true)
local res, err = p:send("TopicTest", message)
if not res then
ngx.say("send err:", err)
return
end
ngx.say("send success")
-- consume
local c = consumer.new(nameservers, "group1")
c:subscribe("TopicTest", "*")
c:registerMessageListener({
consumeMessage = function(self, msgs, context)
ngx.say("consume success:", cjson.encode(msgs))
return consumer.CONSUME_SUCCESS
end
})
c:start()
ngx.sleep(5)
c:stop()
}
}
}
Back to TOC
Modules
resty.rocketmq.producer
To load this module, just do this
local producer = require "resty.rocketmq.producer"
Methods
new
syntax: p = producer.new(nameservers, produce_group, enableMsgTrace, customizedTraceTopic)
nameservers is list of nameserver addresses
addRPCHook
syntax: p:addRPCHook(hook)
hook is a table that contains two functions as follows:
doBeforeRequest(self, addr, header, body)doAfterResponse(self, addr, header, body, respHeader, respBody)
there is an acl hook provided, usage is:
local accessKey, secretKey = "RocketMQ", "12345678"
local aclHook = require("resty.rocketmq.acl_rpchook").new(accessKey, secretKey)
p:addRPCHook(aclHook)
setUseTLS
syntax: p:setUseTLS(useTLS)
useTLS is a boolean
setTimeout
syntax: p:setTimeout(timeout)
timeout is in milliseconds, default 3000
registerSendMessageHook
syntax: p:registerSendMessageHook(hook)
hook is a table that contains two functions as follows:
sendMessageBefore(self, context)sendMessageAfter(self, context)
context is a table that contains:
- producer
- producerGroup
- communicationMode
- bornHost
- brokerAddr
- message
- mq
- msgType
- sendResult
- exception
registerEndTransactionHook
syntax: p:registerEndTransactionHook(hook)
hook is a table that contains a function as follows:
endTransaction(self, context)
context is a table that contains:
- producerGroup
- brokerAddr
- message
- msgId
- transactionId
- transactionState
- fromTransactionCheck
send
syntax: res, err = p:send(topic, message, tags, keys, properties)
properties is a table that contains:
- WAIT
- DELAY
In case of success, returns the a table of results.
In case of errors, returns nil with a string describing the error.
setTransactionListener
syntax: res, err = p:setTransactionListener(transactionListener)
transactionListener is a table that contains two functions as follows:
executeLocalTransaction(self, msg, arg)checkLocalTransaction(self, msg)
sendMessageInTransaction
syntax: res, err = p:sendMessageInTransaction(topic, arg, message, tags, keys, properties)
batchSend
syntax: res, err = p:batchSend(msgs)
msgs is a list of msgs, each msg is a table that contains:
- topic
- body
- tags
- keys
- properties
start
syntax: p:start()
note that if you don't call p:start() before sending messages, messages will be sent successfully, but the trace is not send.
stop
syntax: p:stop()
Back to TOC
resty.rocketmq.consumer
To load this module, just do this
local consumer = require "resty.rocketmq.consumer"
Methods
new
syntax: c = consumer.new(nameservers, consumerGroup)
nameservers is list of nameserver addresses
addRPCHook
syntax: c:addRPCHook(hook)
hook is a table that contains two functions as follows:
doBeforeRequest(self, addr, header, body)doAfterResponse(self, addr, header, body, respHeader, respBody)
there is an acl hook provided, usage is:
local accessKey, secretKey = "RocketMQ", "12345678"
local aclHook = require("resty.rocketmq.acl_rpchook").new(accessKey, secretKey)
c:addRPCHook(aclHook)
setUseTLS
syntax: c:setUseTLS(useTLS)
useTLS is a boolean
setTimeout
syntax: c:setTimeout(timeout)
timeout is in milliseconds, default 3000
registerMessageListener
syntax: c:registerMessageListener(messageListener)
messageListener is a table that contains a function as follows:
consumeMessage(self, msgs, context)
registerConsumeMessageHook
syntax: c:registerConsumeMessageHook(hook)
hook is a table that contains two functions as follows:
consumeMessageBefore(self, context)consumeMessageAfter(self, context)
context is a table that contains:
- consumerGroup
- mq
- msgList
- success
- status
- consumeContextType
subscribe
syntax: c:subscribe(topic, subExpression)
start
syntax: c:start()
stop
syntax: c:stop()
setAllocateMessageQueueStrategy
syntax: c:setAllocateMessageQueueStrategy(strategy)
strategy is a functions as follows:
function(consumerGroup, currentCID, mqAll, cidAll)
default value is consumer.AllocateMessageQueueAveragely
getAllocateMessageQueueStrategy
syntax: local strategy = c:getAllocateMessageQueueStrategy()
setEnableMsgTrace
syntax: c:setEnableMsgTrace(enableMsgTrace)
isEnableMsgTrace
syntax: local enableMsgTrace = c:isEnableMsgTrace()
setCustomizedTraceTopic
syntax: c:setCustomizedTraceTopic(customizedTraceTopic)
getCustomizedTraceTopic
syntax: local customizedTraceTopic = c:getCustomizedTraceTopic()
setConsumeFromWhere
syntax: c:setConsumeFromWhere(consumeFromWhere)
getConsumeFromWhere
syntax: local consumeFromWhere = c:getConsumeFromWhere()
setConsumeTimestamp
syntax: c:setConsumeTimestamp(consumeTimestamp)
getConsumeTimestamp
syntax: local consumeTimestamp = c:getConsumeTimestamp()
setPullThresholdForQueue
syntax: c:setPullThresholdForQueue(pullThresholdForQueue)
getPullThresholdForQueue
syntax: local pullThresholdForQueue = c:getPullThresholdForQueue()
setPullThresholdSizeForQueue
syntax: c:setPullThresholdSizeForQueue(pullThresholdSizeForQueue)
getPullThresholdSizeForQueue
syntax: local pullThresholdSizeForQueue = c:getPullThresholdSizeForQueue()
setPullTimeDelayMillsWhenException
syntax: c:setPullTimeDelayMillsWhenException(pullTimeDelayMillsWhenException)
getPullTimeDelayMillsWhenException
syntax: local pullTimeDelayMillsWhenException = c:getPullTimeDelayMillsWhenException()
setPullBatchSize
syntax: c:setPullBatchSize(pullBatchSize)
getPullBatchSize
syntax: local pullBatchSize = c:getPullBatchSize()
setPullInterval
syntax: c:setPullInterval(pullInterval)
getPullInterval
syntax: local pullInterval = c:getPullInterval()
setConsumeMessageBatchMaxSize
syntax: c:setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize)
getConsumeMessageBatchMaxSize
syntax: local consumeMessageBatchMaxSize = c:getConsumeMessageBatchMaxSize()
setMaxReconsumeTimes
syntax: c:setMaxReconsumeTimes(maxReconsumeTimes)
getMaxReconsumeTimes
syntax: local maxReconsumeTimes = c:getMaxReconsumeTimes()
setClientRebalance
syntax: c:setClientRebalance(clientRebalance)
isClientRebalance
syntax: local clientRebalance = c:isClientRebalance()
setPopThresholdForQueue
syntax: c:setPopThresholdForQueue(popThresholdForQueue)
getPopThresholdForQueue
syntax: local popThresholdForQueue = c:getPopThresholdForQueue()
setPopInvisibleTime
syntax: c:setPopInvisibleTime(popInvisibleTime)
getPopInvisibleTime
syntax: local popInvisibleTime = c:getPopInvisibleTime()
setPopBatchNums
syntax: c:setPopBatchNums(popBatchNums)
getPopBatchNums
syntax: local popBatchNums = c:getPopBatchNums()
Back to TOC
resty.rocketmq.admin
To load this module, just do this
local admin = require "resty.rocketmq.admin"
Methods
new
syntax: adm = admin.new(nameservers)
nameservers is list of nameserver addresses
addRPCHook
syntax: adm:addRPCHook(hook)
hook is a table that contains two functions as follows:
-
doBeforeRequest(self, addr, header, body) -
doAfterResponse(self, addr, header, body, respHeader, respBody)
there is an acl hook provided, usage is:
local accessKey, secretKey = "RocketMQ", "12345678"
local aclHook = require("resty.rocketmq.acl_rpchook").new(accessKey, secretKey)
adm:addRPCHook(aclHook)
setUseTLS
syntax: adm:setUseTLS(useTLS)
useTLS is a boolean
setTimeout
syntax: adm:setTimeout(timeout)
timeout is in milliseconds, default 3000
createTopic
syntax: res, err = adm:createTopic(newTopic, queueNum, topicSysFlag)
- newTopic: the new topic name
- queueNum: read and write queue numbers
- topicSysFlag: system flag of the topic
createTopicForBroker
syntax: res, err = adm:createTopicForBroker(addr, topicConfig)
- addr: broker address
- topicConfig: a table containing:
- topic
- readQueueNums
- writeQueueNums
- perm
- topicFilterType
- topicSysFlag
- order
searchOffset
syntax: res, err = adm:searchOffset(mq, timestamp)
- mq: a table containing:
- brokerName
- topic
- queueId
- timestamp: search time
maxOffset
syntax: res, err = adm:maxOffset(mq)
- mq: a table containing:
- brokerName
- topic
- queueId
minOffset
syntax: res, err = adm:minOffset(mq)
- mq: a table containing:
- brokerName
- topic
- queueId
earliestMsgStoreTime
syntax: res, err = adm:earliestMsgStoreTime(mq)
- mq: a table containing:
- brokerName
- topic
- queueId
viewMessage
syntax: res, err = adm:viewMessage(offsetMsgId)
queryMessage
syntax: res, err = adm:queryMessage(topic, key, maxNum, beginTime, endTime, isUniqKey)
queryTraceByMsgId
syntax: res, err = adm:queryTraceByMsgId(traceTopic, msgId)
Back to TOC
HTTP proxy
quick start
HTTP proxy provides http API of produce/consume messages. RocketMQ >= 5.0.0 is required
Install and start RocketMQ
wget https://archive.apache.org/dist/rocketmq/5.3.0/rocketmq-all-5.3.0-bin-release.zip
unzip rocketmq-all-5.3.0-bin-release.zip
cd rocketmq-5.3.0
nohup bash bin/mqnamesrv &
nohup bash bin/mqbroker -n localhost:9876 -c conf/broker.conf &
Start proxy
cd examples/
openresty -c server/http_proxy.conf -p .
Send message using http proxy
curl localhost:8080/topics/topic1/messages -d '{"properties": {"a": "3", "KEYS": "key", "TAGS": "tag"}, "body": "hello proxy"}'
Consume message using http proxy
curl 'localhost:8080/topics/topic1/messages?consumer=group1&numOfMessages=16&waitseconds=10'
curl localhost:8080/topics/topic1/messages/ack?consumer=group1 -XPUT -d '{"receiptHandles":["32312031363736303232303830303335203630303030203020302062726F6B65722D302030203331"]}'
Back to TOC
SQS proxy
quick start
SQS proxy provides http API compatible with AWS SQS (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)
Install and start RocketMQ
Start SQS proxy
cd examples/
openresty -c server/sqs.conf -p .
Set up AWS SDK project
https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
Send message using AWS SQS java SDK
SqsClient sqsClient = SqsClient.builder()
.httpClientBuilder(ApacheHttpClient.builder())
.endpointOverride(URI.create("http://localhost:8088"))
.build();
SendMessageRequest req = SendMessageRequest.builder()
.queueUrl("TopicTest")
.messageBody("body")
.build();
SendMessageResponse resp = sqsClient.sendMessage(req);
Consume message using AWS SQS java SDK
SqsClient sqsClient = SqsClient.builder()
.httpClientBuilder(ApacheHttpClient.builder())
.endpointOverride(URI.create("http://localhost:8088"))
.build();
while (true) {
ReceiveMessageRequest req = ReceiveMessageRequest.builder()
.queueUrl("TopicTest")
.visibilityTimeout(60)
.maxNumberOfMessages(1)
.waitTimeSeconds(5)
.build();
ReceiveMessageResponse resp = sqsClient.receiveMessage(req);
System.out.printf("%s\n", resp);
for (Message message : resp.messages()) {
DeleteMessageRequest req2 = DeleteMessageRequest.builder()
.queueUrl("TopicTest")
.receiptHandle(message.receiptHandle())
.build();
sqsClient.deleteMessage(req2);
}
}
Back to TOC
Installation
You need to configure the lua_package_path directive to add the path of your lua-resty-rocketmq source tree to ngx_lua's LUA_PATH search path, as in
# nginx.conf
http {
lua_package_path "/path/to/lua-resty-rocketmq/lib/?.lua;;";
...
}
Ensure that the system account running your Nginx ''worker'' proceses have
enough permission to read the .lua file.
Back to TOC
See Also
- ngx_lua module: https://github.com/openresty/lua-nginx-module
- apache rocketmq: https://github.com/apache/rocketmq
- lua-resty-kafka: https://github.com/doujiang24/lua-resty-kafka
- luatz: https://github.com/daurnimator/luatz
Back to TOC