lua-resty-kafka
lua-resty-kafka copied to clipboard
consumer
`
--- Generated by EmmyLua(https://github.com/EmmyLua) --- Created by admin. --- DateTime: 2021/6/10 10:16
local cjson = require "cjson" local consumer = require "resty.kafka.consumer" local client = require "resty.kafka.client"
local broker_list = { { host = "172.20.154.101", port = 9092 }, { host = "172.20.154.103", port = 9092 }, { host = "172.20.154.104", port = 9092 }, }
local delay = 3 -- in seconds local check
check = function(premature) if not premature then local con = consumer:new(broker_list) --local offsets = { [0] = 15028, [1] = 35070 } --local messages, offsets = con:fetch("vlog-topic", offsets) local offsets, err = con:fetch_offset("vlog-topic")
--if not err then
-- ngx.log(ngx.ERR, "fetch err:", offsets)
-- return
--end
--ngx.log(ngx.NOTICE,cjson.encode(err), cjson.encode(offsets))
--return
local offsets = { [0]=0,[1]=0,[2]=0,[3]=0,[4]=10,[5] = 10,[6]=0,[7]=0,[8]=0,[9]=0,}
ngx.log(ngx.NOTICE,"offsets----", cjson.encode(offsets))
local messages, offsets = con:fetch("vlog-topic", offsets)
if not messages then
ngx.log(ngx.ERR, "fetch err:", offsets)
return
end
ngx.log(ngx.NOTICE,"消息----",cjson.encode(messages))
end
end
if 0 == ngx.worker.id() then local ok, err = ngx.timer.every(delay, check) if not ok then ngx.log(ngx.ERR, "failed to create timer: ", err) return end end
`