前言

本章内容主要自定义开发kong的插件,因为我们在调研过程中,发现一个问题:kong目前没有接口熔断降级机制。通常我们的断路有两层含义:一个是外部的访问我们一般会在网关层处理,一个是内部的相互访问,一般会在程序里处理或者是走个内部的网关走边车模式。

一 kong 开发环境搭建

我是java程序员,所以我这边选择了idea +lua开发

  1. 安装idea lua相关插件。

image.png

  1. 创建项目目录

image.png

二 kong 自定义插件初体验

2.1 编写插件测试代码

handler.lua

  1. -- hello-world.handlar.lua
  2. local BasePlugin = require "kong.plugins.base_plugin"
  3. local CustomHandler = BasePlugin:extend()
  4. local resultAns = ">>插件开始运行了\n"
  5. CustomHandler.VERSION = "1.0.0"
  6. CustomHandler.PRIORITY = 10
  7. function CustomHandler:new()
  8. CustomHandler.super.new(self, "hello-world")
  9. end
  10. function CustomHandler:access(config)
  11. CustomHandler.super.access(self)
  12. resultAns = resultAns .. ">>>>>>>执行:access阶段开始\n输出嵌入的内容(请求在还未到达上游服务器):\n"
  13. resultAns = resultAns .. "kong.version:\t" .. kong.version .. "\n"
  14. resultAns = resultAns .. "kong.client.get_ip():\t" .. kong.client.get_ip() .. "\n"
  15. resultAns = resultAns .. "kong.request.get_scheme():\t" .. kong.request.get_scheme() .. "\n"
  16. resultAns = resultAns .. "kong.request.get_host():\t" .. kong.request.get_host() .. "\n"
  17. resultAns = resultAns .. "kong.request.get_port()\t:" .. kong.request.get_port() .. "\n"
  18. resultAns = resultAns .. "kong.request.get_http_version():\t" .. kong.request.get_http_version() .. "\n"
  19. resultAns = resultAns .. "kong.request.get_method():\t" .. kong.request.get_method() .. "\n"
  20. resultAns = resultAns .. "kong.request.get_path():\t" .. kong.request.get_path() .. "\n"
  21. resultAns = resultAns .. "<<<<<<<执行access阶段结束 \n"
  22. return kong.response.exit(
  23. 200,
  24. resultAns,
  25. {
  26. ["Content-Type"] = "application/json",
  27. ["WWW-Authenticate"] = "Basic"
  28. }
  29. )
  30. end
  31. return CustomHandler

schema.lua

--
-- Created by IntelliJ IDEA.
-- User: hezhaoming
-- Date: 2020/11/27
-- Time: 1:01 下午
-- To change this template use File | Settings | File Templates.
--
--  hello-world.schema.lua
local typedefs = require "kong.db.schema.typedefs"
return {
    name = "hello-world",
    fields = {
        {
            consumer = typedefs.no_consumer
        },
        {
            config = {
                type = "record",
                fields = {
                    -- 这里的username, 会显示在插件配置页
                    {
                        username = {
                            type = "array",
                            elements = { type = "string" },
                            default = {}
                        }
                    }
                }
            }
        }
    }
}

2.2 kong 插件调试验证

2.2.1 插件植入配置

这里操作分成两部:

  1. 将插件代码放入kong中:kong 存放插件的路径 /usr/local/share/lua/5.1/kong/plugins/

    //拷贝插件到kong里
    docker cp ./hello-word  kong_kong_1:/usr/local/share/lua/5.1/kong/plugins 
    //更新启动配置
    docker cp ./kong.conf  kong_kong_1:/etc/kong/
    
  2. 修改启动配置

image.png
kong.conf定义了加载哪些插件,plugins配置项的缺省值是bundled,表示加载官方开源插件,如果要加载自定义插件,去掉注释并在后面跟上自定义插件名称。

vi /etc/kong/kong.conf
#plugins = bundled               
改为
plugins = bundled,myheader

如图
image.png

2.2.2 kong 插件调试验证

我们可以查看到插件,并直接添加到服务上
image.png
如图
image.png
测试:
image.png

三 插件开发指南

3.1 插件开发规范

它主要分成四块: 对admin api 操作、数据库操作、插件逻辑处理、表结构生成、插件参数配置。

custom-plugin
├── api.lua            # 用于扩展Admin API 
├── daos.lua           # 数据访问层
├── handler.lua        # (必需)包含请求的生命周期, 提供接口来实现插件逻辑
├── migrations         # 插件的表结构定义语句
│   ├── cassandra.lua  
│   └── postgres.lua
└── schema.lua         # (必需)插件配置参数定义, 可加入自定义校验函数

3.2 插件逻辑(handler.lua)解析

handler 开发样板:

-- 继承BasePlugin
local BasePlugin = require "kong.plugins.base_plugin"
local CustomHandler = BasePlugin:extend()

-- 插件构造函数
function CustomHandler:new()
  CustomHandler.super.new(self, "my-custom-plugin")
end

function CustomHandler:init_worker()
  CustomHandler.super.init_worker(self)
  -- 在这里实现自定义的逻辑
end

function CustomHandler:certificate(config)
  CustomHandler.super.certificate(self)
  -- 在这里实现自定义的逻辑
end

function CustomHandler:rewrite(config)
  CustomHandler.super.rewrite(self)
  -- 在这里实现自定义的逻辑
end

function CustomHandler:access(config)
  CustomHandler.super.access(self)
  -- 在这里实现自定义的逻辑
end

function CustomHandler:header_filter(config)
  CustomHandler.super.header_filter(self)
  -- 在这里实现自定义的逻辑
end

function CustomHandler:body_filter(config)
  CustomHandler.super.body_filter(self)
  -- 在这里实现自定义的逻辑
end

function CustomHandler:log(config)
  CustomHandler.super.log(self)
  -- 在这里实现自定义的逻辑
end

return CustomHandler

如下表格, 为kong插件中支持重写的函数列表

函数名 Lua-Nginx-Module 上下文 描述
:init_worker() init_worker_by_lua 在每个Nginx Worker启动时执行
:certificate() ssl_certificate_by_lua_block 在SSL握手的SSL证书服务阶段执行
:rewrite() rewrite_by_lua_block 每个请求中的rewrite阶段执行
:access() access_by_lua 在被代理至上游服务前执行
:header_filter() header_filter_by_lua 从上游服务器接收所有Response headers后执行
:body_filter() body_filter_by_lua 从上游服务接收的响应主体的每个块时执行。 由于响应被流回客户端,因此它可以超过缓冲区大小并按块进行流式传输。 因此,如果响应很大,则会多次调用此方法
:log() log_by_lua 当最后一个响应字节输出完毕时执行

3.3 插件(schema.lua)配置

-- schema.lua
return {
  no_consumer = true, -- this plugin will only be applied to Services or Routes,
  fields = {
    kafka_brokers = {type="array"},
    kafka_topic = {type = "string"}
  },
  self_check = function(schema, plugin_t, dao, is_updating)
    -- 自定义的验证函数
    return true
  end
}
  • no_consumer: 如果为true, 则插件只能被应用于Service和Routes
  • fields: 一个field数组, field中可定义type,required,unique,default,immutable,enum,regex 等属性
  • self_check: 在安装时, 执行的自定义校验函数

    3.4 PDK(Plugin Development Kit) 开发套件

    kong 的插件开发套件包含了一些常用的Lua函数和变量, 如kong.client,kong.request,kong.log,kong.table 等

    3.5 数据库访问

    Kong自身存储可选PostgreSQL和Cassandra, 在开发插件时, kong提供了一个数据库抽象层用于存储自定义的实体, 也就是dao层.
    要完成数据访问, 需要两步:
  1. 编写Migration文件, 用于数据库DDL操作, 在kong migrations up时执行
  2. 编写daos.lua, 用于映射你的数据表

如下为PostgreSQL示例:

-- 步骤一:postgres.lua
return {
  {
    name = "2015-07-31-172400_init_keyauth",
    up = [[
      CREATE TABLE IF NOT EXISTS keyauth_credentials(
        id uuid,
        consumer_id uuid REFERENCES consumers (id) ON DELETE CASCADE,
        key text UNIQUE,
        created_at timestamp without time zone default (CURRENT_TIMESTAMP(0) at time zone 'utc'),
        PRIMARY KEY (id)
      );
    ]],
    down = [[
      DROP TABLE keyauth_credentials;
    ]]
  },
  .....
}

postgres.lua 由一个Migration数组组成, 每个Migration包含三个字段:name,up,down, 其中name须唯一, up和down分别代表升级和降级时执行的SQL语句

-- 步骤二:daos.lua
local SCHEMA = {
  primary_key = {"id"},
  table = "keyauth_credentials", -- 数据库表名
  fields = {
    id = {type = "id", dao_insert_value = true}, 
    created_at = {type = "timestamp", immutable = true, dao_insert_value = true}, 
    consumer_id = {type = "id", required = true, foreign = "consumers:id"}, 
    key = {type = "string", required = false, unique = true} 
  }
}

return {keyauth_credentials = SCHEMA}  --keyauth_credentials即为在DAO中加入的自定义schema

完成以上两步后, 即可通过如下所示代码在我们的插件中对数据库进行操作。

local singletons = require "kong.singletons"
singletons.dao.keyauth_credentials.find_all({key="ravenzz"})
singletons.dao.keyauth_credentials.insert(...)
singletons.dao.keyauth_credentials.update(...)
....

3.6 缓存机制

作为网关代理层, 缓存一定是必不可少的一环, 在kong的PDK中, 封装了lua-resty-mlcache.
kong的缓存分为两级:

  • L1: Lua memory cache - 在nginx worker中共享, 可以存储任何Lua值
  • L2: Shared memory cache (SHM) - 在nginx node的所有worker中共享. 可以存储任何标量值, 但它需要序列化和反序列化, 所以性能会有所下降

    注: 从数据库中提取数据后,它将同时存储于上述两级缓存中。现在,如果同一个工作进程再次请求数据,它将从Lua内存缓存检索数据。 如果同一个Nginx节点中的不同工作者请求该数据,它将在SHM中找到数据,对其进行反序列化(并将其存储在自己的Lua内存缓存中),然后将其返回。

一个典型的使用方式如下:

-- 通过一个唯一值获取Cache Key
cache_key = singletons.dao.keyauth_credentials:cache_key("api_key")
value, err = singletons.cache:get(cache_key ,nil, load_from_db_func, ....)

函数: value, err = cache:get(key, opts?, cb, …) , 如果cache没有值(miss), 则会调用函数cb, cb 必须返回一个返回值, 可返回需要缓存的值或者nil, 需要注意的是返回值为nil时, get函数依旧会执行缓存, 但我们可以通过cache:get()的第二个参数控制缓存的TTL和negative TTL. 如下代码即代表有数据时, 我们缓存600s, 没有数据时, 缓存nil结果40s

value, err = singletons.cache:get(cache_key ,{
            ttl = 600, --如果有数据, 则缓存的时间, 单位:秒
            neg_ttl = 40 -- 如果没有数据, 单位:秒
        }, load_from_db_func, ....)

通过上述API开发者可轻松实现缓存懒加载功能

四 kong circuit-breaker 实战

由于kong没有自动熔断策略,只提供了一个手动熔断,因此需要我们自行开发。
什么是自动熔断:

  1. 用API的”出错比例”来表示这个API的健康状态,当这个比例值高于设定的阈值,API自动抛出异常,终止服务,这个过程就叫做自动熔断
  2. API熔断后,可以保护数据库等基础服务不受unhealth的API的影响。

大致流程:

  1. 收集API的相关指标
  2. 判断API是否健康
  3. 通过API的健康状态决定API是否需要自动熔断(恢复)

4.1 限流大法之初探

4.2 滑动窗口算法

API的健康状态,一定是指在某个时间段内请求的出错次数太多,那这里就有两个问题:

  1. 怎么定义”某个时间段”
  2. 怎么定义”出错太多”

解决方案:

  1. 时间粒度:假设我们使用两个指标去控制: metrics_granularity:时间粒度,设置metrics_granularity=3,即 统计这3s内的总出错数。
  2. 出错比例:假设定义个”出错数变量”,显然不科学,因为不同的api它的请求数量不同,因此使用”出错比例”较为科学。比如我们统计3s内请求量和3s内请求出错量,但是仔细不想,还是不行。
  3. 时间窗口:我们假设出错率超过45%为unhealth, 在09:14:10 ~ 09:14:13,3s内是高峰期,访问量是100,出错是20,20%<45%,则 API为health,然后 09:14:13~09:14:16 访问量是10,出错量是5,50%>45% API为unhealth。但这个结果显然不科学。更准确的算法是 (20+5)/(10+100), 也就是需要考虑之前的访问状态。但是我们不能一直累计,这样的话,数据量就会很大,也不利于统计,所以我们设计另一个指标 metrics_rollingsize。

具体算法如下:

设定窗口设置5个(metrics_rollingsize=5),初始化数据
[0, 0, 0, 0, 0]
每一个数字代表在(metrics_granularity=3 )3秒内出错的次数
[0, 0, 0, 0, 11]
每当时间超过3秒( metrics_granularity=3),数据向前偏移一位
[0, 0, 0, 15, 11]
当向前偏移的位超过了滑动窗口(5),则去掉最前面的数据,然后在后面补位,保证这个数组的长度总是5位
[11, 123, 99, 11, 0]

这种方式类似于一个滑动窗口,每过metrics_granularity的时间,窗口向前滑动一格。
下面是我用Lua脚本实现的

local _M = {}
local SlidingWindow = {
    --初始化方法 窗口大小: metrics_rolling_size  ;时间粒度:metrics_granularity
    new = function(metrics_rolling_size, metrics_granularity)
        _M = setmetatable({}, SlidingWindow)
        -- 初始化创建时间
        _M._clock_at = os.time()
        -- 初始化窗口大小
        _M.rolling_size = metrics_rolling_size
        -- 初始化时间粒度
        _M.granularity = metrics_granularity
        -- 初始化数组
        _M._values = {}
        -- 窗口大于1 才有意义
        if (metrics_rolling_size > 1) then
            SlidingWindow.init_clear_array()
        end
        return _M
    end,
    shift = function(length)
        if length <= 0
        then
            return
        end
        -- 如果距离上一次的偏移的时间长度大于了rolling_size,说明 已经过了rolling_size * granularity的时间,API没有访问过了
        if length > _M.rolling_size then
            SlidingWindow.init_clear_array()
        end
    end,
    shift_on_clock_changes = function()
        local pass_time = os.time() - _M._clock_at
        local length = math.modf(pass_time / _M.granularity)
        if length > 0 then
            SlidingWindow.shift(length)
            _M._clock_at = os.time()
        end
    end,
    incr = function(value)
        SlidingWindow.shift_on_clock_changes()
        _M._values[-1] + = value
    end,
    value = function()
        SlidingWindow.shift_on_clock_changes()
        -- 注意返回的是累加后的数据
        return sum(self._values)
    end,
    init_clear_array = function()
        for i = 1, _M.rolling_size do
            table.insert(_M._values, 1, 0)
        end
    end
}

4.3 API or Service 健康检查算法

理论上需要关心四个参数:

  1. THRESHOLDREQUEST:请求量大于一个阈值请求量,请求量很少的统计不具有代表性_
  2. THRESHOLD_TIMEOUT:请求超时的数量
  3. THRESHOLD_SYS_EXC:请求成功的数量
  4. THRESHOLD_UNKWN_EXC:请求失败的数量

实际上这里如果配置这么多,生产意义不是很大,我们一把只关心两个参数:

  1. THRESHOLD_REQUEST:请求量肯定要足够的大,才具有代表意义
  2. 失败比例:这里就不区分 超时次数,失败次数

    4.4 熔断算法

  3. 如果api处于非熔断状态

    1. 根据健康状态算法去判断是否需要熔断
      # 如果需要熔断走熔断逻辑
      # 记录熔断时间
      # 更新熔断状态
      locks[key]['locked_at'] = time_now
      locks[key]['locked_status'] = MODE_LOCKED
      
  4. 如果api处于熔断状态

    1. 需要计算 现在的时间-减去熔断器开始时间,然后判断 存活期是否超过配置的熔断存活期
    2. 如果判断此时API是不健康的,或者 还没有过熔断存活期,那么不处理,继续熔断的

      if time.time() - locked_at < MIN_RECOVERY_TIME or not health_ok_now:
        result = False
      

      否则 超过存活时间,且该api是健康的,那么熔断器变成半开状态

      locks[key]['locked_status'] = MODE_RECOVER
      result = True
      

      3.如果是半开状态

      # 此时API已经是health状态,判断最新一次API请求的结果,如果请求是正常的,则有概率的恢复API,
      if api_latest_state.get(key, False):
      locked_span = time_now - locked_at
      # 如果锁住的时间过长,则直接恢复
      if locked_span >= MAX_RECOVERY_TIME:
        locks[key]['locked_at'] = 0
        locks[key]['locked_status'] = MODE_UNLOCKED
        result = True
        signals.after_api_health_unlocked.send(ctx)
      else:
        if random.random() < float(locked_span) / MAX_RECOVERY_TIME:
            result = True
        else:
            result = False
      
      else:
      # 最新的一次请求失败了,说明很有可能这个API又坏了,这时在判断是一下整体API的健康状态
      if not healt_ok_now:
       # 重新锁住
       locks[key]['locked_at'] = time.time()
       locks[key]['locked_status'] = MODE_LOCKED
      

      4.5 Metrics 统计

      五 kong server-bridge

      快捷编辑命令 ```bash docker cp ./sync-eureka kong_kong_1:/usr/local/share/lua/5.1/kong/plugins

docker restart kong_kong_1 ```

5.1 kong admin api

5.2 eureka

5.3 zookeeper

5.4 simple

5.5 测试结果