漏桶算法

漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。
Java/Kotlin 高并发之限流 - 图1

可以使用Kotlin 的背压, 限制接口的请求速度。 背压多用于内部系统限流, 比如系统内部调用可以使用背压做一个缓冲

漏洞算法就是类似管道限流, 比如Webflex 背压以及Kotlin RxJava 的背压,一般多用户发送请求模块比如短信、邮件使用背压控制出口流量

令牌桶算法

  1. 对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。如图所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。<br />![](https://cdn.nlark.com/yuque/0/2021/png/12388785/1613632766594-2772eb16-93b0-4fa3-b7ea-50711e9ae600.png#align=left&display=inline&height=215&margin=%5Bobject%20Object%5D&originHeight=215&originWidth=363&size=0&status=done&style=none&width=363)

Google开源工具包Guava提供了限流工具类RateLimiter 令牌桶可以用来保护自己,主要用来对调用者频率进行限流,为的是让自己不被打垮 PS: 缺点是只能放在JVM中, 无法分布式, 如果使用接口转发会降低效率

令牌桶算法在Guava里面有但是它是单机的,因此我用lua 脚本写一个redis 分布式版本

-- 令牌桶在redis中的key值
local tokens_key = KEYS[1]

-- 该令牌桶上一次刷新的时间对应的key的值
local timestamp_key = KEYS[2]

-- 令牌单位时间填充速率
local rate = tonumber(ARGV[1])
-- 令牌桶容量
local capacity = tonumber(ARGV[2])
-- 当前时间
local now = tonumber(ARGV[3])
-- 请求需要的令牌数
local requested = tonumber(ARGV[4])


-- -----------------
-- 填充所需要的时间
local fill_time = capacity / rate
-- 过期时间 需要时间 * 2
local ttl = math.floor(fill_time * 2)

local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
    last_tokens = capacity
end

local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
    last_refreshed = 0
end

local delta = math.max(0, now - last_refreshed)
local filled_tokens = math.min(capacity, last_tokens + (delta * rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
    new_tokens = filled_tokens - requested
    allowed_num = 1
end

if ttl > 0 then
    redis.call("setex", tokens_key, ttl, new_tokens)
    redis.call("setex", timestamp_key, ttl, now)
end

return { allowed_num, new_tokens }

间隔限流

每一次请求之后必须休息指定时间才能继续请求,主要针对非常重量级的API使用。

区间限流

是一种简单的限流算法, 在特定时间内仅允许访问多少次, 这种算法简单。相比令牌算法没有均衡限流会导致在区间开始就已经将次数用完,但是可以调整时间阈值进行优化。

代码片段

@file:Suppress("unused")

package com.yuemia.xsj.core.xxs

import com.google.inject.Inject
import com.google.inject.Singleton
import com.yuemia.xsj.exception.CoreException
import io.vertx.core.Vertx
import io.vertx.ext.web.RoutingContext
import io.vertx.kotlin.coroutines.awaitEvent
import org.slf4j.LoggerFactory
import java.time.Instant

@Singleton
class RequestLimit @Inject constructor(
    private val redisCore: RedisCore,
    private val vertx: Vertx
) {

    private val log = LoggerFactory.getLogger(RequestLimit::class.java)
    private var tokenScript = ""

    /**
     * 间隔算法.
     * @param context RoutingContext 请求上下文.
     * @param id String 限制ID (用户ID).
     * @param time Int 限制间隔, 单位秒.
     * @param error String 错误消息文本.
     * @param fn Function0<Unit> 回执函数.
     */
    suspend fun limitInterval(context: RoutingContext, id: String, time: Long = 2, error: String = "稍后再试~", fn: suspend () -> Unit) {
        //如果为空直接返回
        if (id.isEmpty()) {
            context.fail(CoreException(error))
            return
        }

        //写入缓存
        val flag = redisCore.exists("REQUEST_LIMIT:$id")
        if (flag) {
            context.fail(CoreException(error))
            return
        }

        //写入
        redisCore.setex("REQUEST_LIMIT:$id", time, "1")

        //执行方法
        fn()
    }

    /**
     * 计数器算法
     * @param context RoutingContext 请求上下文.
     * @param id String 限制ID (用户ID).
     * @param index Long 限制次数.
     * @param time Long 限制间隔, 单位秒.
     * @param error String 错误消息文本.
     * @param fn SuspendFunction0<Unit> 回执函数.
     */
    suspend fun limitIndex(context: RoutingContext, id: String, index: Long = 24, time: Long = 2, error: String = "稍后再试~", fn: suspend () -> Unit) {
        //如果为空直接返回
        if (id.isEmpty()) {
            context.fail(CoreException(error))
            return
        }

        //获取次数
        val requestIndex = redisCore.getInt("REQUEST_INDEX:$id")

        //写入次数
        if (requestIndex <= 0) {
            redisCore.setex("REQUEST_INDEX:$id", time, "1")
        } else {
            redisCore.incr("REQUEST_INDEX:$id")
        }

        //如果大于则失败
        if (requestIndex > index) {
            log.error("[请求限制]: ${requestIndex}/${index} 配额不足")
            redisCore.expire("REQUEST_INDEX:$id", 1)
            context.fail(CoreException(error))
            return
        }

        //执行方法
        fn()
    }

    /**
     * 令牌桶算法.
     * @param context RoutingContext 请求上下文.
     * @param id String 限制ID (用户ID).
     * @param speed Long 生产速度.
     * @param total Long 令牌桶大小.
     * @param consumption Long 每次消耗.
     * @param error String 错误消息文本.
     * @param fn SuspendFunction0<Unit> 回执函数.
     */
    suspend fun limitTokenBucket(
        context: RoutingContext,
        id: String,
        speed: Long = 4,
        total: Long = 32,
        consumption: Long = 1,
        error: String = "稍后再试~",
        fn: suspend () -> Unit
    ) {
        //如果为空直接返回
        if (id.isEmpty()) {
            context.fail(CoreException(error))
            return
        }
        val script = if (tokenScript.isEmpty()) {
            this.tokenScript = awaitEvent {
                ThreadPool.execute {
                    it.handle(String(this.javaClass.getResourceAsStream("/script/redis_rate_limit.lua").readAllBytes()))
                }
            }
            this.tokenScript
        } else {
            this.tokenScript
        }
        val result = redisCore.eval(
            arrayListOf(
                script, "2", "LIMIT_TOKEN_$id", "LIMIT_TOKEN_" + id + "_TIME",
                speed.toString(), total.toString(), Instant.now().toEpochMilli().toString(), consumption.toString()
            )
        )
        // 判断是否可以请求
        if (result[0].toInteger() <= 0) {
            context.fail(CoreException(error))
        }
        fn()
    }

}