Redis 接口的二次封装

先看一下官方的调用示例代码:

  1. local redis = require("resty.redis")
  2. local red = redis:new()
  3. red:set_timeout(1000) -- 1 sec
  4. local ok, err = red:connect("127.0.0.1", 6379)
  5. if not ok then
  6. ngx.say("failed to connect: ", err)
  7. return
  8. end
  9. ok, err = red:set("dog", "an animal")
  10. if not ok then
  11. ngx.say("failed to set dog: ", err)
  12. return
  13. end
  14. ngx.say("set result: ", ok)
  15. -- put it into the connection pool of size 100,
  16. -- with 10 seconds max idle time
  17. local ok, err = red:set_keepalive(10000, 100)
  18. if not ok then
  19. ngx.say("failed to set keepalive: ", err)
  20. return
  21. end

这是一个标准的 Redis 接口调用,如果你的代码中 Redis 被调用频率不高,那么这段代码不会有任何问题。

但如果你的项目重度依赖 Redis,工程中有大量的程序在重复 创建连接—>数据操作—>关闭连接(或放到连接池) 这条完整的链路。甚至在调用完毕还要考虑不同的 return 情况做不同处理,你很快就会发现代码有大量的重复。

Lua 是不支持面向对象的。很多人用尽各种招数使用元表来模拟。可是,Lua 的发明者似乎不想看到这样的情形,因为他们把取长度的 __len 方法以及析构函数 __gc 留给了 C API,纯 Lua 只能望洋兴叹。

我们期望的代码应该是这样的:

  1. local red = redis:new()
  2. local ok, err = red:set("dog", "an animal")
  3. if not ok then
  4. ngx.say("failed to set dog: ", err)
  5. return
  6. end
  7. ngx.say("set result: ", ok)
  8. local res, err = red:get("dog")
  9. if not res then
  10. ngx.say("failed to get dog: ", err)
  11. return
  12. end
  13. if res == ngx.null then
  14. ngx.say("dog not found.")
  15. return
  16. end
  17. ngx.say("dog: ", res)

期望它自身具备以下几个特征:

  • newconnect 函数合体,使用时只负责申请,尽量少关心什么时候具体连接、释放;
  • 默认 Redis 数据库连接地址,但是允许自定义;
  • 每次 Redis 使用完毕,自动释放 Redis 连接到连接池供其他请求复用;
  • 要支持 Redis 的重要优化手段 pipeline;

不卖关子,只要干货,我们最后是这样干的,可以这里看到 gist代码

  1. -- file name: resty/redis_iresty.lua
  2. local redis_c = require("resty.redis")
  3. local ok, new_tab = pcall(require, "table.new")
  4. if not ok or type(new_tab) ~= "function" then
  5. new_tab = function (narr, nrec) return {} end
  6. end
  7. local _M = new_tab(0, 155)
  8. _M._VERSION = '0.01'
  9. local commands = {
  10. "append", "auth", "bgrewriteaof",
  11. "bgsave", "bitcount", "bitop",
  12. "blpop", "brpop",
  13. "brpoplpush", "client", "config",
  14. "dbsize",
  15. "debug", "decr", "decrby",
  16. "del", "discard", "dump",
  17. "echo",
  18. "eval", "exec", "exists",
  19. "expire", "expireat", "flushall",
  20. "flushdb", "get", "getbit",
  21. "getrange", "getset", "hdel",
  22. "hexists", "hget", "hgetall",
  23. "hincrby", "hincrbyfloat", "hkeys",
  24. "hlen",
  25. "hmget", "hmset", "hscan",
  26. "hset",
  27. "hsetnx", "hvals", "incr",
  28. "incrby", "incrbyfloat", "info",
  29. "keys",
  30. "lastsave", "lindex", "linsert",
  31. "llen", "lpop", "lpush",
  32. "lpushx", "lrange", "lrem",
  33. "lset", "ltrim", "mget",
  34. "migrate",
  35. "monitor", "move", "mset",
  36. "msetnx", "multi", "object",
  37. "persist", "pexpire", "pexpireat",
  38. "ping", "psetex", "psubscribe",
  39. "pttl",
  40. "publish", --[[ "punsubscribe", ]] "pubsub",
  41. "quit",
  42. "randomkey", "rename", "renamenx",
  43. "restore",
  44. "rpop", "rpoplpush", "rpush",
  45. "rpushx", "sadd", "save",
  46. "scan", "scard", "script",
  47. "sdiff", "sdiffstore",
  48. "select", "set", "setbit",
  49. "setex", "setnx", "setrange",
  50. "shutdown", "sinter", "sinterstore",
  51. "sismember", "slaveof", "slowlog",
  52. "smembers", "smove", "sort",
  53. "spop", "srandmember", "srem",
  54. "sscan",
  55. "strlen", --[[ "subscribe", ]] "sunion",
  56. "sunionstore", "sync", "time",
  57. "ttl",
  58. "type", --[[ "unsubscribe", ]] "unwatch",
  59. "watch", "zadd", "zcard",
  60. "zcount", "zincrby", "zinterstore",
  61. "zrange", "zrangebyscore", "zrank",
  62. "zrem", "zremrangebyrank", "zremrangebyscore",
  63. "zrevrange", "zrevrangebyscore", "zrevrank",
  64. "zscan",
  65. "zscore", "zunionstore", "evalsha"
  66. }
  67. local mt = { __index = _M }
  68. local function is_redis_null( res )
  69. if type(res) == "table" then
  70. for k,v in pairs(res) do
  71. if v ~= ngx.null then
  72. return false
  73. end
  74. end
  75. return true
  76. elseif res == ngx.null then
  77. return true
  78. elseif res == nil then
  79. return true
  80. end
  81. return false
  82. end
  83. -- change connect address as you need
  84. function _M.connect_mod( self, redis )
  85. redis:set_timeout(self.timeout)
  86. return redis:connect("127.0.0.1", 6379)
  87. end
  88. function _M.set_keepalive_mod( redis )
  89. -- put it into the connection pool of size 100, with 60 seconds max idle time
  90. return redis:set_keepalive(60000, 1000)
  91. end
  92. function _M.init_pipeline( self )
  93. self._reqs = {}
  94. end
  95. function _M.commit_pipeline( self )
  96. local reqs = self._reqs
  97. if nil == reqs or 0 == #reqs then
  98. return {}, "no pipeline"
  99. else
  100. self._reqs = nil
  101. end
  102. local redis, err = redis_c:new()
  103. if not redis then
  104. return nil, err
  105. end
  106. local ok, err = self:connect_mod(redis)
  107. if not ok then
  108. return {}, err
  109. end
  110. redis:init_pipeline()
  111. for _, vals in ipairs(reqs) do
  112. local fun = redis[vals[1]]
  113. table.remove(vals , 1)
  114. fun(redis, unpack(vals))
  115. end
  116. local results, err = redis:commit_pipeline()
  117. if not results or err then
  118. return {}, err
  119. end
  120. if is_redis_null(results) then
  121. results = {}
  122. ngx.log(ngx.WARN, "is null")
  123. end
  124. -- table.remove (results , 1)
  125. self.set_keepalive_mod(redis)
  126. for i,value in ipairs(results) do
  127. if is_redis_null(value) then
  128. results[i] = nil
  129. end
  130. end
  131. return results, err
  132. end
  133. function _M.subscribe( self, channel )
  134. local redis, err = redis_c:new()
  135. if not redis then
  136. return nil, err
  137. end
  138. local ok, err = self:connect_mod(redis)
  139. if not ok or err then
  140. return nil, err
  141. end
  142. local res, err = redis:subscribe(channel)
  143. if not res then
  144. return nil, err
  145. end
  146. res, err = redis:read_reply()
  147. if not res then
  148. return nil, err
  149. end
  150. redis:unsubscribe(channel)
  151. self.set_keepalive_mod(redis)
  152. return res, err
  153. end
  154. local function do_command(self, cmd, ... )
  155. if self._reqs then
  156. table.insert(self._reqs, {cmd, ...})
  157. return
  158. end
  159. local redis, err = redis_c:new()
  160. if not redis then
  161. return nil, err
  162. end
  163. local ok, err = self:connect_mod(redis)
  164. if not ok or err then
  165. return nil, err
  166. end
  167. local fun = redis[cmd]
  168. local result, err = fun(redis, ...)
  169. if not result or err then
  170. -- ngx.log(ngx.ERR, "pipeline result:", result, " err:", err)
  171. return nil, err
  172. end
  173. if is_redis_null(result) then
  174. result = nil
  175. end
  176. self.set_keepalive_mod(redis)
  177. return result, err
  178. end
  179. for i = 1, #commands do
  180. local cmd = commands[i]
  181. _M[cmd] =
  182. function (self, ...)
  183. return do_command(self, cmd, ...)
  184. end
  185. end
  186. function _M.new(self, opts)
  187. opts = opts or {}
  188. local timeout = (opts.timeout and opts.timeout * 1000) or 1000
  189. local db_index= opts.db_index or 0
  190. return setmetatable({
  191. timeout = timeout,
  192. db_index = db_index,
  193. _reqs = nil }, mt)
  194. end
  195. return _M

调用示例代码:

local redis = require("resty.redis_iresty")
local red   = redis:new()

local ok, err = red:set("dog", "an animal")
if not ok then
    ngx.say("failed to set dog: ", err)
    return
end

ngx.say("set result: ", ok)

在最终的示例代码中看到,所有的连接创建、连接销毁、以及连接池部分,都被完美隐藏了,我们只需要业关注务就可以了。妈妈再也不用担心我把 Redis 搞垮了。