320 lines
8.4 KiB
Lua
320 lines
8.4 KiB
Lua
---
|
||
--- Desc: 对OpenResty中使用Lua对Redis操作的封装库,支持订阅、管道等功能
|
||
--- Note:本Lua脚本借鉴了网络,未经测试,仅供参考,也不提供任何技术支持
|
||
---
|
||
local redis_c = require "resty.redis"
|
||
|
||
local ok, new_tab = pcall(require, "table.new")
|
||
if not ok or type(new_tab) ~= "function" then
|
||
new_tab = function (narr, nrec) return {} end
|
||
end
|
||
|
||
local _M = new_tab(0, 155)
|
||
_M._VERSION = '0.01'
|
||
|
||
local commands = {
|
||
"append", "auth", "bgrewriteaof",
|
||
"bgsave", "bitcount", "bitop",
|
||
"blpop", "brpop",
|
||
"brpoplpush", "client", "config",
|
||
"dbsize",
|
||
"debug", "decr", "decrby",
|
||
"del", "discard", "dump",
|
||
"echo",
|
||
"eval", "exec", "exists",
|
||
"expire", "expireat", "flushall",
|
||
"flushdb", "get", "getbit",
|
||
"getrange", "getset", "hdel",
|
||
"hexists", "hget", "hgetall",
|
||
"hincrby", "hincrbyfloat", "hkeys",
|
||
"hlen",
|
||
"hmget", "hmset", "hscan",
|
||
"hset",
|
||
"hsetnx", "hvals", "incr",
|
||
"incrby", "incrbyfloat", "info",
|
||
"keys",
|
||
"lastsave", "lindex", "linsert",
|
||
"llen", "lpop", "lpush",
|
||
"lpushx", "lrange", "lrem",
|
||
"lset", "ltrim", "mget",
|
||
"migrate",
|
||
"monitor", "move", "mset",
|
||
"msetnx", "multi", "object",
|
||
"persist", "pexpire", "pexpireat",
|
||
"ping", "psetex", "psubscribe",
|
||
"pttl",
|
||
"publish", --[[ "punsubscribe", ]] "pubsub",
|
||
"quit",
|
||
"randomkey", "rename", "renamenx",
|
||
"restore",
|
||
"rpop", "rpoplpush", "rpush",
|
||
"rpushx", "sadd", "save",
|
||
"scan", "scard", "script",
|
||
"sdiff", "sdiffstore",
|
||
"select", "set", "setbit",
|
||
"setex", "setnx", "setrange",
|
||
"shutdown", "sinter", "sinterstore",
|
||
"sismember", "slaveof", "slowlog",
|
||
"smembers", "smove", "sort",
|
||
"spop", "srandmember", "srem",
|
||
"sscan",
|
||
"strlen", --[[ "subscribe", ]] "sunion",
|
||
"sunionstore", "sync", "time",
|
||
"ttl",
|
||
"type", --[[ "unsubscribe", ]] "unwatch",
|
||
"watch", "zadd", "zcard",
|
||
"zcount", "zincrby", "zinterstore",
|
||
"zrange", "zrangebyscore", "zrank",
|
||
"zrem", "zremrangebyrank", "zremrangebyscore",
|
||
"zrevrange", "zrevrangebyscore", "zrevrank",
|
||
"zscan",
|
||
"zscore", "zunionstore", "evalsha"
|
||
}
|
||
|
||
local mt = { __index = _M }
|
||
|
||
local function is_redis_null( res )
|
||
if type(res) == "table" then
|
||
for k,v in pairs(res) do
|
||
if v ~= ngx.null then
|
||
return false
|
||
end
|
||
end
|
||
return true
|
||
elseif res == ngx.null then
|
||
return true
|
||
elseif res == nil then
|
||
return true
|
||
end
|
||
|
||
return false
|
||
end
|
||
|
||
function _M.close_redis(self, redis)
|
||
if not redis then
|
||
return
|
||
end
|
||
--释放连接(连接池实现)
|
||
local pool_max_idle_time = self.pool_max_idle_time --最大空闲时间 毫秒
|
||
local pool_size = self.pool_size --连接池大小
|
||
|
||
local ok, err = redis:set_keepalive(pool_max_idle_time, pool_size)
|
||
if not ok then
|
||
ngx.say("set keepalive error : ", err)
|
||
end
|
||
end
|
||
|
||
-- change connect address as you need
|
||
function _M.connect_mod( self, redis )
|
||
redis:set_timeout(self.timeout)
|
||
|
||
local ok, err = redis:connect(self.ip, self.port)
|
||
if not ok then
|
||
ngx.say("connect to redis error : ", err)
|
||
return self:close_redis(redis)
|
||
end
|
||
|
||
if self.password then ----密码认证
|
||
local count, err = redis:get_reused_times()
|
||
if 0 == count then ----新建连接,需要认证密码
|
||
ok, err = redis:auth(self.password)
|
||
if not ok then
|
||
ngx.say("failed to auth: ", err)
|
||
return
|
||
end
|
||
elseif err then ----从连接池中获取连接,无需再次认证密码
|
||
ngx.say("failed to get reused times: ", err)
|
||
return
|
||
end
|
||
end
|
||
|
||
return ok,err;
|
||
end
|
||
|
||
function _M.init_pipeline( self )
|
||
self._reqs = {}
|
||
end
|
||
|
||
function _M.commit_pipeline( self )
|
||
local reqs = self._reqs
|
||
|
||
if nil == reqs or 0 == #reqs then
|
||
return {}, "no pipeline"
|
||
else
|
||
self._reqs = nil
|
||
end
|
||
|
||
local redis, err = redis_c:new()
|
||
if not redis then
|
||
return nil, err
|
||
end
|
||
|
||
local ok, err = self:connect_mod(redis)
|
||
if not ok then
|
||
return {}, err
|
||
end
|
||
|
||
redis:init_pipeline()
|
||
for _, vals in ipairs(reqs) do
|
||
local fun = redis[vals[1]]
|
||
table.remove(vals , 1)
|
||
|
||
fun(redis, unpack(vals))
|
||
end
|
||
|
||
local results, err = redis:commit_pipeline()
|
||
if not results or err then
|
||
return {}, err
|
||
end
|
||
|
||
if is_redis_null(results) then
|
||
results = {}
|
||
ngx.log(ngx.WARN, "is null")
|
||
end
|
||
-- table.remove (results , 1)
|
||
|
||
--self.set_keepalive_mod(redis)
|
||
self:close_redis(redis)
|
||
|
||
for i,value in ipairs(results) do
|
||
if is_redis_null(value) then
|
||
results[i] = nil
|
||
end
|
||
end
|
||
|
||
return results, err
|
||
end
|
||
|
||
|
||
local function do_command(self, cmd, ... )
|
||
if self._reqs then
|
||
table.insert(self._reqs, {cmd, ...})
|
||
return
|
||
end
|
||
|
||
local redis, err = redis_c:new()
|
||
if not redis then
|
||
return nil, err
|
||
end
|
||
|
||
local ok, err = self:connect_mod(redis)
|
||
if not ok or err then
|
||
return nil, err
|
||
end
|
||
|
||
redis:select(self.db_index)
|
||
|
||
local fun = redis[cmd]
|
||
local result, err = fun(redis, ...)
|
||
if not result or err then
|
||
-- ngx.log(ngx.ERR, "pipeline result:", result, " err:", err)
|
||
return nil, err
|
||
end
|
||
|
||
if is_redis_null(result) then
|
||
result = nil
|
||
end
|
||
|
||
--self.set_keepalive_mod(redis)
|
||
self:close_redis(redis)
|
||
|
||
return result, err
|
||
end
|
||
|
||
for i = 1, #commands do
|
||
local cmd = commands[i]
|
||
_M[cmd] =
|
||
function (self, ...)
|
||
return do_command(self, cmd, ...)
|
||
end
|
||
end
|
||
|
||
function _M.new(self, opts)
|
||
opts = opts or {}
|
||
local timeout = (opts.timeout and opts.timeout * 1000) or 1000
|
||
local db_index= opts.db_index or 0
|
||
local ip = opts.ip or '127.0.0.1'
|
||
local port = opts.port or 6379
|
||
local password = opts.password
|
||
local pool_max_idle_time = opts.pool_max_idle_time or 60000
|
||
local pool_size = opts.pool_size or 100
|
||
|
||
return setmetatable({
|
||
timeout = timeout,
|
||
db_index = db_index,
|
||
ip = ip,
|
||
port = port,
|
||
password = password,
|
||
pool_max_idle_time = pool_max_idle_time,
|
||
pool_size = pool_size,
|
||
_reqs = nil }, mt)
|
||
end
|
||
|
||
function _M.subscribe( self, channel )
|
||
local redis, err = redis_c:new()
|
||
if not redis then
|
||
return nil, err
|
||
end
|
||
|
||
local ok, err = self:connect_mod(redis)
|
||
if not ok or err then
|
||
return nil, err
|
||
end
|
||
|
||
local res, err = redis:subscribe(channel)
|
||
if not res then
|
||
return nil, err
|
||
end
|
||
|
||
local function do_read_func ( do_read )
|
||
if do_read == nil or do_read == true then
|
||
res, err = redis:read_reply()
|
||
if not res then
|
||
return nil, err
|
||
end
|
||
return res
|
||
end
|
||
|
||
redis:unsubscribe(channel)
|
||
self.set_keepalive_mod(redis)
|
||
return
|
||
end
|
||
|
||
return do_read_func
|
||
end
|
||
|
||
return _M
|
||
|
||
---------------------------------------
|
||
-- 调用案例
|
||
|
||
local redis = require "RedisExtOps"
|
||
local opts = {
|
||
ip = "10.11.0.215",
|
||
port = "6379",
|
||
password = "redis123",
|
||
db_index = 1
|
||
}
|
||
local red = redis:new(opts)
|
||
local ok, err = red:set("dog", "an animal")
|
||
if not ok then
|
||
ngx.say("failed to set dog: ", err)
|
||
return
|
||
end
|
||
ngx.say("set result: ", ok)
|
||
|
||
---------------------------------------
|
||
-- 管道
|
||
red:init_pipeline()
|
||
red:set("cat", "Marry")
|
||
red:set("horse", "Bob")
|
||
red:get("cat")
|
||
red:get("horse")
|
||
local results, err = red:commit_pipeline()
|
||
if not results then
|
||
ngx.say("failed to commit the pipelined requests: ", err)
|
||
return
|
||
end
|
||
for i, res in ipairs(results) do
|
||
ngx.say(res,"<br/>");
|
||
end |