320 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Lua
		
	
	
	
	
	
			
		
		
	
	
			320 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Lua
		
	
	
	
	
	
---
 | 
						||
--- Desc: 对OpenResty中使用Lua对Redis操作的封装库,支持订阅、管道等功能
 | 
						||
--- Note:本Lua脚本借鉴了网络,未经测试,仅供参考,也不提供任何技术支持
 | 
						||
---
 | 
						||
local redis_c = require "resty.redis"
 | 
						||
 | 
						||
local ok, new_tab = pcall(require, "table.new")
 | 
						||
if not ok or type(new_tab) ~= "function" then
 | 
						||
    new_tab = function (narr, nrec) return {} end
 | 
						||
end
 | 
						||
 | 
						||
local _M = new_tab(0, 155)
 | 
						||
_M._VERSION = '0.01'
 | 
						||
 | 
						||
local commands = {
 | 
						||
    "append",            "auth",              "bgrewriteaof",
 | 
						||
    "bgsave",            "bitcount",          "bitop",
 | 
						||
    "blpop",             "brpop",
 | 
						||
    "brpoplpush",        "client",            "config",
 | 
						||
    "dbsize",
 | 
						||
    "debug",             "decr",              "decrby",
 | 
						||
    "del",               "discard",           "dump",
 | 
						||
    "echo",
 | 
						||
    "eval",              "exec",              "exists",
 | 
						||
    "expire",            "expireat",          "flushall",
 | 
						||
    "flushdb",           "get",               "getbit",
 | 
						||
    "getrange",          "getset",            "hdel",
 | 
						||
    "hexists",           "hget",              "hgetall",
 | 
						||
    "hincrby",           "hincrbyfloat",      "hkeys",
 | 
						||
    "hlen",
 | 
						||
    "hmget",              "hmset",      "hscan",
 | 
						||
    "hset",
 | 
						||
    "hsetnx",            "hvals",             "incr",
 | 
						||
    "incrby",            "incrbyfloat",       "info",
 | 
						||
    "keys",
 | 
						||
    "lastsave",          "lindex",            "linsert",
 | 
						||
    "llen",              "lpop",              "lpush",
 | 
						||
    "lpushx",            "lrange",            "lrem",
 | 
						||
    "lset",              "ltrim",             "mget",
 | 
						||
    "migrate",
 | 
						||
    "monitor",           "move",              "mset",
 | 
						||
    "msetnx",            "multi",             "object",
 | 
						||
    "persist",           "pexpire",           "pexpireat",
 | 
						||
    "ping",              "psetex",            "psubscribe",
 | 
						||
    "pttl",
 | 
						||
    "publish",      --[[ "punsubscribe", ]]   "pubsub",
 | 
						||
    "quit",
 | 
						||
    "randomkey",         "rename",            "renamenx",
 | 
						||
    "restore",
 | 
						||
    "rpop",              "rpoplpush",         "rpush",
 | 
						||
    "rpushx",            "sadd",              "save",
 | 
						||
    "scan",              "scard",             "script",
 | 
						||
    "sdiff",             "sdiffstore",
 | 
						||
    "select",            "set",               "setbit",
 | 
						||
    "setex",             "setnx",             "setrange",
 | 
						||
    "shutdown",          "sinter",            "sinterstore",
 | 
						||
    "sismember",         "slaveof",           "slowlog",
 | 
						||
    "smembers",          "smove",             "sort",
 | 
						||
    "spop",              "srandmember",       "srem",
 | 
						||
    "sscan",
 | 
						||
    "strlen",       --[[ "subscribe",  ]]     "sunion",
 | 
						||
    "sunionstore",       "sync",              "time",
 | 
						||
    "ttl",
 | 
						||
    "type",         --[[ "unsubscribe", ]]    "unwatch",
 | 
						||
    "watch",             "zadd",              "zcard",
 | 
						||
    "zcount",            "zincrby",           "zinterstore",
 | 
						||
    "zrange",            "zrangebyscore",     "zrank",
 | 
						||
    "zrem",              "zremrangebyrank",   "zremrangebyscore",
 | 
						||
    "zrevrange",         "zrevrangebyscore",  "zrevrank",
 | 
						||
    "zscan",
 | 
						||
    "zscore",            "zunionstore",       "evalsha"
 | 
						||
}
 | 
						||
 | 
						||
local mt = { __index = _M }
 | 
						||
 | 
						||
local function is_redis_null( res )
 | 
						||
    if type(res) == "table" then
 | 
						||
        for k,v in pairs(res) do
 | 
						||
            if v ~= ngx.null then
 | 
						||
                return false
 | 
						||
            end
 | 
						||
        end
 | 
						||
        return true
 | 
						||
    elseif res == ngx.null then
 | 
						||
        return true
 | 
						||
    elseif res == nil then
 | 
						||
        return true
 | 
						||
    end
 | 
						||
 | 
						||
    return false
 | 
						||
end
 | 
						||
 | 
						||
function _M.close_redis(self, redis)
 | 
						||
    if not redis then
 | 
						||
        return
 | 
						||
    end
 | 
						||
    --释放连接(连接池实现)
 | 
						||
    local pool_max_idle_time = self.pool_max_idle_time --最大空闲时间 毫秒
 | 
						||
    local pool_size = self.pool_size --连接池大小
 | 
						||
 | 
						||
    local ok, err = redis:set_keepalive(pool_max_idle_time, pool_size)
 | 
						||
    if not ok then
 | 
						||
        ngx.say("set keepalive error : ", err)
 | 
						||
    end
 | 
						||
end
 | 
						||
 | 
						||
-- change connect address as you need
 | 
						||
function _M.connect_mod( self, redis )
 | 
						||
    redis:set_timeout(self.timeout)
 | 
						||
 | 
						||
    local ok, err = redis:connect(self.ip, self.port)
 | 
						||
    if not ok then
 | 
						||
        ngx.say("connect to redis error : ", err)
 | 
						||
        return self:close_redis(redis)
 | 
						||
    end
 | 
						||
 | 
						||
    if self.password then ----密码认证
 | 
						||
    local count, err = redis:get_reused_times()
 | 
						||
        if 0 == count then ----新建连接,需要认证密码
 | 
						||
        ok, err = redis:auth(self.password)
 | 
						||
            if not ok then
 | 
						||
                ngx.say("failed to auth: ", err)
 | 
						||
                return
 | 
						||
            end
 | 
						||
        elseif err then  ----从连接池中获取连接,无需再次认证密码
 | 
						||
        ngx.say("failed to get reused times: ", err)
 | 
						||
            return
 | 
						||
        end
 | 
						||
    end
 | 
						||
 | 
						||
    return ok,err;
 | 
						||
end
 | 
						||
 | 
						||
function _M.init_pipeline( self )
 | 
						||
    self._reqs = {}
 | 
						||
end
 | 
						||
 | 
						||
function _M.commit_pipeline( self )
 | 
						||
    local reqs = self._reqs
 | 
						||
 | 
						||
    if nil == reqs or 0 == #reqs then
 | 
						||
        return {}, "no pipeline"
 | 
						||
    else
 | 
						||
        self._reqs = nil
 | 
						||
    end
 | 
						||
 | 
						||
    local redis, err = redis_c:new()
 | 
						||
    if not redis then
 | 
						||
        return nil, err
 | 
						||
    end
 | 
						||
 | 
						||
    local ok, err = self:connect_mod(redis)
 | 
						||
    if not ok then
 | 
						||
        return {}, err
 | 
						||
    end
 | 
						||
 | 
						||
    redis:init_pipeline()
 | 
						||
    for _, vals in ipairs(reqs) do
 | 
						||
        local fun = redis[vals[1]]
 | 
						||
        table.remove(vals , 1)
 | 
						||
 | 
						||
        fun(redis, unpack(vals))
 | 
						||
    end
 | 
						||
 | 
						||
    local results, err = redis:commit_pipeline()
 | 
						||
    if not results or err then
 | 
						||
        return {}, err
 | 
						||
    end
 | 
						||
 | 
						||
    if is_redis_null(results) then
 | 
						||
        results = {}
 | 
						||
        ngx.log(ngx.WARN, "is null")
 | 
						||
    end
 | 
						||
    -- table.remove (results , 1)
 | 
						||
 | 
						||
    --self.set_keepalive_mod(redis)
 | 
						||
    self:close_redis(redis)
 | 
						||
 | 
						||
    for i,value in ipairs(results) do
 | 
						||
        if is_redis_null(value) then
 | 
						||
            results[i] = nil
 | 
						||
        end
 | 
						||
    end
 | 
						||
 | 
						||
    return results, err
 | 
						||
end
 | 
						||
 | 
						||
 | 
						||
local function do_command(self, cmd, ... )
 | 
						||
    if self._reqs then
 | 
						||
        table.insert(self._reqs, {cmd, ...})
 | 
						||
        return
 | 
						||
    end
 | 
						||
 | 
						||
    local redis, err = redis_c:new()
 | 
						||
    if not redis then
 | 
						||
        return nil, err
 | 
						||
    end
 | 
						||
 | 
						||
    local ok, err = self:connect_mod(redis)
 | 
						||
    if not ok or err then
 | 
						||
        return nil, err
 | 
						||
    end
 | 
						||
 | 
						||
    redis:select(self.db_index)
 | 
						||
 | 
						||
    local fun = redis[cmd]
 | 
						||
    local result, err = fun(redis, ...)
 | 
						||
    if not result or err then
 | 
						||
        -- ngx.log(ngx.ERR, "pipeline result:", result, " err:", err)
 | 
						||
        return nil, err
 | 
						||
    end
 | 
						||
 | 
						||
    if is_redis_null(result) then
 | 
						||
        result = nil
 | 
						||
    end
 | 
						||
 | 
						||
    --self.set_keepalive_mod(redis)
 | 
						||
    self:close_redis(redis)
 | 
						||
 | 
						||
    return result, err
 | 
						||
end
 | 
						||
 | 
						||
for i = 1, #commands do
 | 
						||
    local cmd = commands[i]
 | 
						||
    _M[cmd] =
 | 
						||
    function (self, ...)
 | 
						||
        return do_command(self, cmd, ...)
 | 
						||
    end
 | 
						||
end
 | 
						||
 | 
						||
function _M.new(self, opts)
 | 
						||
    opts = opts or {}
 | 
						||
    local timeout = (opts.timeout and opts.timeout * 1000) or 1000
 | 
						||
    local db_index= opts.db_index or 0
 | 
						||
    local ip = opts.ip or '127.0.0.1'
 | 
						||
    local port = opts.port or 6379
 | 
						||
    local password = opts.password
 | 
						||
    local pool_max_idle_time = opts.pool_max_idle_time or 60000
 | 
						||
    local pool_size = opts.pool_size or 100
 | 
						||
 | 
						||
    return setmetatable({
 | 
						||
        timeout = timeout,
 | 
						||
        db_index = db_index,
 | 
						||
        ip = ip,
 | 
						||
        port = port,
 | 
						||
        password = password,
 | 
						||
        pool_max_idle_time = pool_max_idle_time,
 | 
						||
        pool_size = pool_size,
 | 
						||
        _reqs = nil }, mt)
 | 
						||
end
 | 
						||
 | 
						||
function _M.subscribe( self, channel )
 | 
						||
    local redis, err = redis_c:new()
 | 
						||
    if not redis then
 | 
						||
        return nil, err
 | 
						||
    end
 | 
						||
 | 
						||
    local ok, err = self:connect_mod(redis)
 | 
						||
    if not ok or err then
 | 
						||
        return nil, err
 | 
						||
    end
 | 
						||
 | 
						||
    local res, err = redis:subscribe(channel)
 | 
						||
    if not res then
 | 
						||
        return nil, err
 | 
						||
    end
 | 
						||
 | 
						||
    local function do_read_func ( do_read )
 | 
						||
        if do_read == nil or do_read == true then
 | 
						||
            res, err = redis:read_reply()
 | 
						||
            if not res then
 | 
						||
                return nil, err
 | 
						||
            end
 | 
						||
            return res
 | 
						||
        end
 | 
						||
 | 
						||
        redis:unsubscribe(channel)
 | 
						||
        self.set_keepalive_mod(redis)
 | 
						||
        return
 | 
						||
    end
 | 
						||
 | 
						||
    return do_read_func
 | 
						||
end
 | 
						||
 | 
						||
return _M
 | 
						||
 | 
						||
---------------------------------------
 | 
						||
-- 调用案例
 | 
						||
 | 
						||
local redis = require "RedisExtOps"
 | 
						||
local opts = {
 | 
						||
    ip = "10.11.0.215",
 | 
						||
    port = "6379",
 | 
						||
    password = "redis123",
 | 
						||
    db_index = 1
 | 
						||
}
 | 
						||
local red = redis:new(opts)
 | 
						||
local ok, err = red:set("dog", "an animal")
 | 
						||
if not ok then
 | 
						||
    ngx.say("failed to set dog: ", err)
 | 
						||
    return
 | 
						||
end
 | 
						||
ngx.say("set result: ", ok)
 | 
						||
 | 
						||
---------------------------------------
 | 
						||
-- 管道
 | 
						||
red:init_pipeline()
 | 
						||
red:set("cat", "Marry")
 | 
						||
red:set("horse", "Bob")
 | 
						||
red:get("cat")
 | 
						||
red:get("horse")
 | 
						||
local results, err = red:commit_pipeline()
 | 
						||
if not results then
 | 
						||
    ngx.say("failed to commit the pipelined requests: ", err)
 | 
						||
    return
 | 
						||
end
 | 
						||
for i, res in ipairs(results) do
 | 
						||
    ngx.say(res,"<br/>");
 | 
						||
end |