前言
本章内容主要自定义开发kong的插件,因为我们在调研过程中,发现一个问题:kong目前没有接口熔断降级机制。通常我们的断路有两层含义:一个是外部的访问我们一般会在网关层处理,一个是内部的相互访问,一般会在程序里处理或者是走个内部的网关走边车模式。
一 kong 开发环境搭建
我是java程序员,所以我这边选择了idea +lua开发
- 安装idea lua相关插件。

- 创建项目目录

二 kong 自定义插件初体验
2.1 编写插件测试代码
handler.lua
-- hello-world.handlar.lualocal BasePlugin = require "kong.plugins.base_plugin"local CustomHandler = BasePlugin:extend()local resultAns = ">>插件开始运行了\n"CustomHandler.VERSION = "1.0.0"CustomHandler.PRIORITY = 10function CustomHandler:new()CustomHandler.super.new(self, "hello-world")endfunction CustomHandler:access(config)CustomHandler.super.access(self)resultAns = resultAns .. ">>>>>>>执行:access阶段开始\n输出嵌入的内容(请求在还未到达上游服务器):\n"resultAns = resultAns .. "kong.version:\t" .. kong.version .. "\n"resultAns = resultAns .. "kong.client.get_ip():\t" .. kong.client.get_ip() .. "\n"resultAns = resultAns .. "kong.request.get_scheme():\t" .. kong.request.get_scheme() .. "\n"resultAns = resultAns .. "kong.request.get_host():\t" .. kong.request.get_host() .. "\n"resultAns = resultAns .. "kong.request.get_port()\t:" .. kong.request.get_port() .. "\n"resultAns = resultAns .. "kong.request.get_http_version():\t" .. kong.request.get_http_version() .. "\n"resultAns = resultAns .. "kong.request.get_method():\t" .. kong.request.get_method() .. "\n"resultAns = resultAns .. "kong.request.get_path():\t" .. kong.request.get_path() .. "\n"resultAns = resultAns .. "<<<<<<<执行access阶段结束 \n"return kong.response.exit(200,resultAns,{["Content-Type"] = "application/json",["WWW-Authenticate"] = "Basic"})endreturn CustomHandler
schema.lua
--
-- Created by IntelliJ IDEA.
-- User: hezhaoming
-- Date: 2020/11/27
-- Time: 1:01 下午
-- To change this template use File | Settings | File Templates.
--
-- hello-world.schema.lua
local typedefs = require "kong.db.schema.typedefs"
return {
name = "hello-world",
fields = {
{
consumer = typedefs.no_consumer
},
{
config = {
type = "record",
fields = {
-- 这里的username, 会显示在插件配置页
{
username = {
type = "array",
elements = { type = "string" },
default = {}
}
}
}
}
}
}
}
2.2 kong 插件调试验证
2.2.1 插件植入配置
这里操作分成两部:
将插件代码放入kong中:kong 存放插件的路径 /usr/local/share/lua/5.1/kong/plugins/
//拷贝插件到kong里 docker cp ./hello-word kong_kong_1:/usr/local/share/lua/5.1/kong/plugins //更新启动配置 docker cp ./kong.conf kong_kong_1:/etc/kong/修改启动配置

kong.conf定义了加载哪些插件,plugins配置项的缺省值是bundled,表示加载官方开源插件,如果要加载自定义插件,去掉注释并在后面跟上自定义插件名称。
vi /etc/kong/kong.conf
#plugins = bundled
改为
plugins = bundled,myheader
2.2.2 kong 插件调试验证
三 插件开发指南
3.1 插件开发规范
它主要分成四块: 对admin api 操作、数据库操作、插件逻辑处理、表结构生成、插件参数配置。
custom-plugin
├── api.lua # 用于扩展Admin API
├── daos.lua # 数据访问层
├── handler.lua # (必需)包含请求的生命周期, 提供接口来实现插件逻辑
├── migrations # 插件的表结构定义语句
│ ├── cassandra.lua
│ └── postgres.lua
└── schema.lua # (必需)插件配置参数定义, 可加入自定义校验函数
3.2 插件逻辑(handler.lua)解析
handler 开发样板:
-- 继承BasePlugin
local BasePlugin = require "kong.plugins.base_plugin"
local CustomHandler = BasePlugin:extend()
-- 插件构造函数
function CustomHandler:new()
CustomHandler.super.new(self, "my-custom-plugin")
end
function CustomHandler:init_worker()
CustomHandler.super.init_worker(self)
-- 在这里实现自定义的逻辑
end
function CustomHandler:certificate(config)
CustomHandler.super.certificate(self)
-- 在这里实现自定义的逻辑
end
function CustomHandler:rewrite(config)
CustomHandler.super.rewrite(self)
-- 在这里实现自定义的逻辑
end
function CustomHandler:access(config)
CustomHandler.super.access(self)
-- 在这里实现自定义的逻辑
end
function CustomHandler:header_filter(config)
CustomHandler.super.header_filter(self)
-- 在这里实现自定义的逻辑
end
function CustomHandler:body_filter(config)
CustomHandler.super.body_filter(self)
-- 在这里实现自定义的逻辑
end
function CustomHandler:log(config)
CustomHandler.super.log(self)
-- 在这里实现自定义的逻辑
end
return CustomHandler
如下表格, 为kong插件中支持重写的函数列表
| 函数名 | Lua-Nginx-Module 上下文 | 描述 |
|---|---|---|
| :init_worker() | init_worker_by_lua | 在每个Nginx Worker启动时执行 |
| :certificate() | ssl_certificate_by_lua_block | 在SSL握手的SSL证书服务阶段执行 |
| :rewrite() | rewrite_by_lua_block | 每个请求中的rewrite阶段执行 |
| :access() | access_by_lua | 在被代理至上游服务前执行 |
| :header_filter() | header_filter_by_lua | 从上游服务器接收所有Response headers后执行 |
| :body_filter() | body_filter_by_lua | 从上游服务接收的响应主体的每个块时执行。 由于响应被流回客户端,因此它可以超过缓冲区大小并按块进行流式传输。 因此,如果响应很大,则会多次调用此方法 |
| :log() | log_by_lua | 当最后一个响应字节输出完毕时执行 |
3.3 插件(schema.lua)配置
-- schema.lua
return {
no_consumer = true, -- this plugin will only be applied to Services or Routes,
fields = {
kafka_brokers = {type="array"},
kafka_topic = {type = "string"}
},
self_check = function(schema, plugin_t, dao, is_updating)
-- 自定义的验证函数
return true
end
}
- no_consumer: 如果为true, 则插件只能被应用于Service和Routes
- fields: 一个field数组, field中可定义type,required,unique,default,immutable,enum,regex 等属性
- self_check: 在安装时, 执行的自定义校验函数
3.4 PDK(Plugin Development Kit) 开发套件
kong 的插件开发套件包含了一些常用的Lua函数和变量, 如kong.client,kong.request,kong.log,kong.table 等3.5 数据库访问
Kong自身存储可选PostgreSQL和Cassandra, 在开发插件时, kong提供了一个数据库抽象层用于存储自定义的实体, 也就是dao层.
要完成数据访问, 需要两步:
- 编写Migration文件, 用于数据库DDL操作, 在kong migrations up时执行
- 编写daos.lua, 用于映射你的数据表
如下为PostgreSQL示例:
-- 步骤一:postgres.lua
return {
{
name = "2015-07-31-172400_init_keyauth",
up = [[
CREATE TABLE IF NOT EXISTS keyauth_credentials(
id uuid,
consumer_id uuid REFERENCES consumers (id) ON DELETE CASCADE,
key text UNIQUE,
created_at timestamp without time zone default (CURRENT_TIMESTAMP(0) at time zone 'utc'),
PRIMARY KEY (id)
);
]],
down = [[
DROP TABLE keyauth_credentials;
]]
},
.....
}
postgres.lua 由一个Migration数组组成, 每个Migration包含三个字段:name,up,down, 其中name须唯一, up和down分别代表升级和降级时执行的SQL语句
-- 步骤二:daos.lua
local SCHEMA = {
primary_key = {"id"},
table = "keyauth_credentials", -- 数据库表名
fields = {
id = {type = "id", dao_insert_value = true},
created_at = {type = "timestamp", immutable = true, dao_insert_value = true},
consumer_id = {type = "id", required = true, foreign = "consumers:id"},
key = {type = "string", required = false, unique = true}
}
}
return {keyauth_credentials = SCHEMA} --keyauth_credentials即为在DAO中加入的自定义schema
完成以上两步后, 即可通过如下所示代码在我们的插件中对数据库进行操作。
local singletons = require "kong.singletons"
singletons.dao.keyauth_credentials.find_all({key="ravenzz"})
singletons.dao.keyauth_credentials.insert(...)
singletons.dao.keyauth_credentials.update(...)
....
3.6 缓存机制
作为网关代理层, 缓存一定是必不可少的一环, 在kong的PDK中, 封装了lua-resty-mlcache.
kong的缓存分为两级:
- L1: Lua memory cache - 在nginx worker中共享, 可以存储任何Lua值
- L2: Shared memory cache (SHM) - 在nginx node的所有worker中共享. 可以存储任何标量值, 但它需要序列化和反序列化, 所以性能会有所下降
注: 从数据库中提取数据后,它将同时存储于上述两级缓存中。现在,如果同一个工作进程再次请求数据,它将从Lua内存缓存检索数据。 如果同一个Nginx节点中的不同工作者请求该数据,它将在SHM中找到数据,对其进行反序列化(并将其存储在自己的Lua内存缓存中),然后将其返回。
一个典型的使用方式如下:
-- 通过一个唯一值获取Cache Key
cache_key = singletons.dao.keyauth_credentials:cache_key("api_key")
value, err = singletons.cache:get(cache_key ,nil, load_from_db_func, ....)
函数: value, err = cache:get(key, opts?, cb, …) , 如果cache没有值(miss), 则会调用函数cb, cb 必须返回一个返回值, 可返回需要缓存的值或者nil, 需要注意的是返回值为nil时, get函数依旧会执行缓存, 但我们可以通过cache:get()的第二个参数控制缓存的TTL和negative TTL. 如下代码即代表有数据时, 我们缓存600s, 没有数据时, 缓存nil结果40s
value, err = singletons.cache:get(cache_key ,{
ttl = 600, --如果有数据, 则缓存的时间, 单位:秒
neg_ttl = 40 -- 如果没有数据, 单位:秒
}, load_from_db_func, ....)
四 kong circuit-breaker 实战
由于kong没有自动熔断策略,只提供了一个手动熔断,因此需要我们自行开发。
什么是自动熔断:
- 用API的”出错比例”来表示这个API的健康状态,当这个比例值高于设定的阈值,API自动抛出异常,终止服务,这个过程就叫做自动熔断
- API熔断后,可以保护数据库等基础服务不受unhealth的API的影响。
大致流程:
- 收集API的相关指标
- 判断API是否健康
- 通过API的健康状态决定API是否需要自动熔断(恢复)
4.1 限流大法之初探
4.2 滑动窗口算法
API的健康状态,一定是指在某个时间段内请求的出错次数太多,那这里就有两个问题:
- 怎么定义”某个时间段”
- 怎么定义”出错太多”
解决方案:
- 时间粒度:假设我们使用两个指标去控制: metrics_granularity:时间粒度,设置metrics_granularity=3,即 统计这3s内的总出错数。
- 出错比例:假设定义个”出错数变量”,显然不科学,因为不同的api它的请求数量不同,因此使用”出错比例”较为科学。比如我们统计3s内请求量和3s内请求出错量,但是仔细不想,还是不行。
- 时间窗口:我们假设出错率超过45%为unhealth, 在09:14:10 ~ 09:14:13,3s内是高峰期,访问量是100,出错是20,20%<45%,则 API为health,然后 09:14:13~09:14:16 访问量是10,出错量是5,50%>45% API为unhealth。但这个结果显然不科学。更准确的算法是 (20+5)/(10+100), 也就是需要考虑之前的访问状态。但是我们不能一直累计,这样的话,数据量就会很大,也不利于统计,所以我们设计另一个指标 metrics_rollingsize。
具体算法如下:
设定窗口设置5个(metrics_rollingsize=5),初始化数据
[0, 0, 0, 0, 0]
每一个数字代表在(metrics_granularity=3 )3秒内出错的次数
[0, 0, 0, 0, 11]
每当时间超过3秒( metrics_granularity=3),数据向前偏移一位
[0, 0, 0, 15, 11]
当向前偏移的位超过了滑动窗口(5),则去掉最前面的数据,然后在后面补位,保证这个数组的长度总是5位
[11, 123, 99, 11, 0]
这种方式类似于一个滑动窗口,每过metrics_granularity的时间,窗口向前滑动一格。
下面是我用Lua脚本实现的
local _M = {}
local SlidingWindow = {
--初始化方法 窗口大小: metrics_rolling_size ;时间粒度:metrics_granularity
new = function(metrics_rolling_size, metrics_granularity)
_M = setmetatable({}, SlidingWindow)
-- 初始化创建时间
_M._clock_at = os.time()
-- 初始化窗口大小
_M.rolling_size = metrics_rolling_size
-- 初始化时间粒度
_M.granularity = metrics_granularity
-- 初始化数组
_M._values = {}
-- 窗口大于1 才有意义
if (metrics_rolling_size > 1) then
SlidingWindow.init_clear_array()
end
return _M
end,
shift = function(length)
if length <= 0
then
return
end
-- 如果距离上一次的偏移的时间长度大于了rolling_size,说明 已经过了rolling_size * granularity的时间,API没有访问过了
if length > _M.rolling_size then
SlidingWindow.init_clear_array()
end
end,
shift_on_clock_changes = function()
local pass_time = os.time() - _M._clock_at
local length = math.modf(pass_time / _M.granularity)
if length > 0 then
SlidingWindow.shift(length)
_M._clock_at = os.time()
end
end,
incr = function(value)
SlidingWindow.shift_on_clock_changes()
_M._values[-1] + = value
end,
value = function()
SlidingWindow.shift_on_clock_changes()
-- 注意返回的是累加后的数据
return sum(self._values)
end,
init_clear_array = function()
for i = 1, _M.rolling_size do
table.insert(_M._values, 1, 0)
end
end
}
4.3 API or Service 健康检查算法
理论上需要关心四个参数:
- THRESHOLDREQUEST:请求量大于一个阈值请求量,请求量很少的统计不具有代表性_
- THRESHOLD_TIMEOUT:请求超时的数量
- THRESHOLD_SYS_EXC:请求成功的数量
- THRESHOLD_UNKWN_EXC:请求失败的数量
实际上这里如果配置这么多,生产意义不是很大,我们一把只关心两个参数:
- THRESHOLD_REQUEST:请求量肯定要足够的大,才具有代表意义
-
4.4 熔断算法
如果api处于非熔断状态
- 根据健康状态算法去判断是否需要熔断
# 如果需要熔断走熔断逻辑 # 记录熔断时间 # 更新熔断状态 locks[key]['locked_at'] = time_now locks[key]['locked_status'] = MODE_LOCKED
- 根据健康状态算法去判断是否需要熔断
如果api处于熔断状态
- 需要计算 现在的时间-减去熔断器开始时间,然后判断 存活期是否超过配置的熔断存活期
如果判断此时API是不健康的,或者 还没有过熔断存活期,那么不处理,继续熔断的
if time.time() - locked_at < MIN_RECOVERY_TIME or not health_ok_now: result = False否则 超过存活时间,且该api是健康的,那么熔断器变成半开状态
locks[key]['locked_status'] = MODE_RECOVER result = True3.如果是半开状态
# 此时API已经是health状态,判断最新一次API请求的结果,如果请求是正常的,则有概率的恢复API, if api_latest_state.get(key, False): locked_span = time_now - locked_at # 如果锁住的时间过长,则直接恢复 if locked_span >= MAX_RECOVERY_TIME: locks[key]['locked_at'] = 0 locks[key]['locked_status'] = MODE_UNLOCKED result = True signals.after_api_health_unlocked.send(ctx) else: if random.random() < float(locked_span) / MAX_RECOVERY_TIME: result = True else: result = False else: # 最新的一次请求失败了,说明很有可能这个API又坏了,这时在判断是一下整体API的健康状态 if not healt_ok_now: # 重新锁住 locks[key]['locked_at'] = time.time() locks[key]['locked_status'] = MODE_LOCKED4.5 Metrics 统计
五 kong server-bridge
快捷编辑命令 ```bash docker cp ./sync-eureka kong_kong_1:/usr/local/share/lua/5.1/kong/plugins
docker restart kong_kong_1 ```


