漏桶算法
漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。
可以使用Kotlin 的背压, 限制接口的请求速度。 背压多用于内部系统限流, 比如系统内部调用可以使用背压做一个缓冲
漏洞算法就是类似管道限流, 比如Webflex 背压以及Kotlin RxJava 的背压,一般多用户发送请求模块比如短信、邮件使用背压控制出口流量
令牌桶算法
对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。如图所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。<br />![](https://cdn.nlark.com/yuque/0/2021/png/12388785/1613632766594-2772eb16-93b0-4fa3-b7ea-50711e9ae600.png#align=left&display=inline&height=215&margin=%5Bobject%20Object%5D&originHeight=215&originWidth=363&size=0&status=done&style=none&width=363)
Google开源工具包Guava提供了限流工具类RateLimiter 令牌桶可以用来保护自己,主要用来对调用者频率进行限流,为的是让自己不被打垮 PS: 缺点是只能放在JVM中, 无法分布式, 如果使用接口转发会降低效率
令牌桶算法在Guava里面有但是它是单机的,因此我用lua 脚本写一个redis 分布式版本
-- 令牌桶在redis中的key值
local tokens_key = KEYS[1]
-- 该令牌桶上一次刷新的时间对应的key的值
local timestamp_key = KEYS[2]
-- 令牌单位时间填充速率
local rate = tonumber(ARGV[1])
-- 令牌桶容量
local capacity = tonumber(ARGV[2])
-- 当前时间
local now = tonumber(ARGV[3])
-- 请求需要的令牌数
local requested = tonumber(ARGV[4])
-- -----------------
-- 填充所需要的时间
local fill_time = capacity / rate
-- 过期时间 需要时间 * 2
local ttl = math.floor(fill_time * 2)
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
last_tokens = capacity
end
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
local delta = math.max(0, now - last_refreshed)
local filled_tokens = math.min(capacity, last_tokens + (delta * rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end
if ttl > 0 then
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)
end
return { allowed_num, new_tokens }
间隔限流
每一次请求之后必须休息指定时间才能继续请求,主要针对非常重量级的API使用。
区间限流
是一种简单的限流算法, 在特定时间内仅允许访问多少次, 这种算法简单。相比令牌算法没有均衡限流会导致在区间开始就已经将次数用完,但是可以调整时间阈值进行优化。
代码片段
@file:Suppress("unused")
package com.yuemia.xsj.core.xxs
import com.google.inject.Inject
import com.google.inject.Singleton
import com.yuemia.xsj.exception.CoreException
import io.vertx.core.Vertx
import io.vertx.ext.web.RoutingContext
import io.vertx.kotlin.coroutines.awaitEvent
import org.slf4j.LoggerFactory
import java.time.Instant
@Singleton
class RequestLimit @Inject constructor(
private val redisCore: RedisCore,
private val vertx: Vertx
) {
private val log = LoggerFactory.getLogger(RequestLimit::class.java)
private var tokenScript = ""
/**
* 间隔算法.
* @param context RoutingContext 请求上下文.
* @param id String 限制ID (用户ID).
* @param time Int 限制间隔, 单位秒.
* @param error String 错误消息文本.
* @param fn Function0<Unit> 回执函数.
*/
suspend fun limitInterval(context: RoutingContext, id: String, time: Long = 2, error: String = "稍后再试~", fn: suspend () -> Unit) {
//如果为空直接返回
if (id.isEmpty()) {
context.fail(CoreException(error))
return
}
//写入缓存
val flag = redisCore.exists("REQUEST_LIMIT:$id")
if (flag) {
context.fail(CoreException(error))
return
}
//写入
redisCore.setex("REQUEST_LIMIT:$id", time, "1")
//执行方法
fn()
}
/**
* 计数器算法
* @param context RoutingContext 请求上下文.
* @param id String 限制ID (用户ID).
* @param index Long 限制次数.
* @param time Long 限制间隔, 单位秒.
* @param error String 错误消息文本.
* @param fn SuspendFunction0<Unit> 回执函数.
*/
suspend fun limitIndex(context: RoutingContext, id: String, index: Long = 24, time: Long = 2, error: String = "稍后再试~", fn: suspend () -> Unit) {
//如果为空直接返回
if (id.isEmpty()) {
context.fail(CoreException(error))
return
}
//获取次数
val requestIndex = redisCore.getInt("REQUEST_INDEX:$id")
//写入次数
if (requestIndex <= 0) {
redisCore.setex("REQUEST_INDEX:$id", time, "1")
} else {
redisCore.incr("REQUEST_INDEX:$id")
}
//如果大于则失败
if (requestIndex > index) {
log.error("[请求限制]: ${requestIndex}/${index} 配额不足")
redisCore.expire("REQUEST_INDEX:$id", 1)
context.fail(CoreException(error))
return
}
//执行方法
fn()
}
/**
* 令牌桶算法.
* @param context RoutingContext 请求上下文.
* @param id String 限制ID (用户ID).
* @param speed Long 生产速度.
* @param total Long 令牌桶大小.
* @param consumption Long 每次消耗.
* @param error String 错误消息文本.
* @param fn SuspendFunction0<Unit> 回执函数.
*/
suspend fun limitTokenBucket(
context: RoutingContext,
id: String,
speed: Long = 4,
total: Long = 32,
consumption: Long = 1,
error: String = "稍后再试~",
fn: suspend () -> Unit
) {
//如果为空直接返回
if (id.isEmpty()) {
context.fail(CoreException(error))
return
}
val script = if (tokenScript.isEmpty()) {
this.tokenScript = awaitEvent {
ThreadPool.execute {
it.handle(String(this.javaClass.getResourceAsStream("/script/redis_rate_limit.lua").readAllBytes()))
}
}
this.tokenScript
} else {
this.tokenScript
}
val result = redisCore.eval(
arrayListOf(
script, "2", "LIMIT_TOKEN_$id", "LIMIT_TOKEN_" + id + "_TIME",
speed.toString(), total.toString(), Instant.now().toEpochMilli().toString(), consumption.toString()
)
)
// 判断是否可以请求
if (result[0].toInteger() <= 0) {
context.fail(CoreException(error))
}
fn()
}
}