阿里妹导读:不同的场景下所需的流控算法不尽相同,那应该如何选择适用的流控方案呢?本文分享单机及分布式流控场景下,简单窗口、滑动窗口、漏桶、令牌桶、滑动日志等几种流控算法的思路和代码实现,并总结了各自的复杂度和适用场景。较长,同学们可收藏后再看。
public interface Throttler {
/**
* 尝试申请一个配额
*
* @param key 申请配额的key
* @return 申请成功则返回true,否则返回false
*/
boolean tryAcquire(String key);
}
简单窗口是我自己的命名,有些地方也叫做固定窗口,主要是为了跟后面的滑动窗口区分。
如果访问次数小于阈值,则代表允许访问,访问次数 +1。
如果访问次数超出阈值,则限制访问,访问次数不增。
如果超过了时间窗口,计数器清零,并重置清零后的首次成功访问时间为当前时间。这样就确保计数器统计的是最近一个窗口的访问量。
/**
* 毫秒为单位的时间窗口
*/
private final long windowInMs;
/**
* 时间窗口内最大允许的阈值
*/
private final int threshold;
/**
* 最后一次成功请求时间
*/
private long lastReqTime = System.currentTimeMillis();
/**
* 计数器
*/
private long counter;
public boolean tryAcquire(String key) {
long now = System.currentTimeMillis();
// 如果当前时间已经超过了上一次访问时间开始的时间窗口,重置计数器,以当前时间作为新窗口的起始值
if (now - lastReqTime > windowInMs) {
counter = 0;
lastReqTime = now;
}
if (counter < threshold) {
counter++;
return true;
} else {
return false;
}
}
private volatile long lastReqTime = System.currentTimeMillis();
private LongAdder counter = new LongAdder();
a)总窗口大小 intervalInMs,滑动子窗口大小 windowLengthInMs,采样数量sampleCount:
sampleCount = intervalInMs / windowLengthInMs
windowStart:滑动窗口的开始时间。
windowLength:滑动窗口的长度。
value:滑动窗口记录的内容,泛型表示,关键的一类就是 MetricBucket,里面包含了一组 LongAdder 用于记录不同类型的数据,例如请求通过数、请求阻塞数、请求异常数等等。
最大允许请求数 N:桶的大小
时间窗口大小 T:一整桶水漏完的时间
最大访问速率 V:一整桶水漏完的速度,即 N/T
请求被限流:桶注水的速度比漏水的速度快,最终导致桶内水溢出
/**
* 当前桶内剩余的水
*/
private long left;
/**
* 上次成功注水的时间戳
*/
private long lastInjectTime = System.currentTimeMillis();
/**
* 桶的容量
*/
private long capacity;
/**
* 一桶水漏完的时间
*/
private long duration;
/**
* 桶漏水的速度,即 capacity / duration
*/
private double velocity;
public boolean tryAcquire(String key) {
long now = System.currentTimeMillis();
// 当前剩余的水 = 之前的剩余水量 - 过去这段时间内漏掉的水量
// 过去这段时间内漏掉的水量 = (当前时间-上次注水时间) * 漏水速度
// 如果当前时间相比上次注水时间相隔太久(一直没有注水),桶内的剩余水量就是0(漏完了)
left = Math.max(0, left - (long)((now - lastInjectTime) * velocity));
// 往当前水量基础上注一单位水,只要没有溢出就代表可以访问
if (left + 1 <= capacity) {
lastInjectTime = now;
left++;
return true;
} else {
return false;
}
}
令牌桶算法的原理是系统以恒定的速率产生令牌,然后把令牌放到令牌桶中,令牌桶有一个容量,当令牌桶满了的时候,再向其中放令牌,那么多余的令牌会被丢弃;当想要处理一个请求的时候,需要从令牌桶中取出一个令牌,如果此时令牌桶中没有令牌,那么则拒绝该请求。
long now = System.currentTimeMillis();
left = Math.min(capacity, left + (long)((now - lastInjectTime) * velocity));
if (left - 1 > 0) {
lastInjectTime = now;
left--;
return true;
} else {
return false;
}
滑动日志与滑动窗口非常像,区别在于滑动日志的滑动是根据日志记录的时间做动态滑动,而滑动窗口是根据子窗口的大小,以子窗口维度滑动。
# 初始化
counter = 0
q = []
# 请求处理流程
# 1.找到队列中第一个时间戳>=t-T的请求,即以当前时间t截止的时间窗口T内的最早请求
t = now
start = findWindowStart(q, t)
# 2.截断队列,只保留最近T时间窗口内的记录和计数值
q = q[start, q.length - 1]
counter -= start
# 3.判断是否放行,如果允许放行则将这次请求加到队列 q 的末尾
if counter < threshold
push(q, t)
counter++
# 放行
else
# 限流
状态的一致性在中心系统维护,实现简单。
中心系统节点的不可用会导致流控出错,需要有额外的保护。例如,中心化流控在中心存储不可用时,往往会退化为单机流控。
相比中心化方案,去中心化方案能够降低中心化单点可靠性带来的影响,但实现上比较复杂,状态的一致性难以保证。
在 CAP 中去中心化更加倾向于 A 而中心化更倾向于 C。
去中心化方案在生产环境中没有见过,因此下文只讨论中心化流控的思路。
limit_req_zone $binary_remote_addr zone=mylimit:10m rate=10r/s;
server {
location /login/ {
limit_req zone=mylimit;
proxy_pass http://my_upstream;
}
}
这里借用了 Sentinel 中的 TokenServer 叫法,Sentinel 集群流控的介绍可以参考官方文档:Sentinel集群流控 (https://github.com/alibaba/Sentinel/wiki/集群流控)。
Token Server 自动管理、调度(分配/选举 Token Server)
Token Server 高可用,在某个 Server 不可用时自动 failover 到其它机器
public boolean tryAcquire(String key) {
// 以秒为单位构建tair的key
String wrappedKey = wrapKey(key);
// 每次请求+1,初始值为0,key的有效期设置5s
Result<Integer> result = tairManager.incr(NAMESPACE, wrappedKey, 1, 0, 5);
return result.isSuccess() && result.getValue() <= threshold;
}
private String wrapKey(String key) {
long sec = System.currentTimeMillis() / 1000L;
return key + ":" + sec;
}
incr
Result incr(int namespace, Serializable key, int value, int defaultValue, int expireTime)
描述
增加计数。注意:incr 前不要 put!!
参数
namespace - 申请时分配的 namespace key - key 列表,不超过 1k value - 增加量 defaultValue - 第一次调用 incr 时的 key 的 count 初始值,第一次返回的值为 defaultValue + value。 expireTime - 数据过期时间,单位为秒,可设相对时间或绝对时间(Unix 时间戳)。expireTime = 0,表示数据永不过期。expireTime > 0,表示设置过期时间。若 expireTime > 当前时间的时间戳,则表示使用绝对时间,否则使用相对时间。expireTime < 0,表示不关注过期时间,若之前设过过期时间,则已之前的过期时间为准,若没有,则作为永不过期处理,但当前 mdb 统一当做永不过期来处理。
返回值
Result 对象,返回值可为负值。当 key 不存在时,第一次返回 defaultValue+ value。后续的 incr 基于该值增加 value。
简单窗口的临界突变问题。
Tair 的可靠性问题,需要有降级方案。上面其实也说了,中心化的流控一般都需要搭配降级的单机流控。
集群机器的时间同步问题。由于生成 key 会用到集群机器的本地时间,因此要求机器时间必须是一致的。
Redis INCR key(https://redis.io/commands/incr)
FUNCTION LIMIT_API_CALL(ip)
ts = CURRENT_UNIX_TIME()
keyname = ip+":"+ts
current = GET(keyname)
IF current != NULL AND current > 10 THEN
ERROR "too many requests per second"
ELSE
MULTI
INCR(keyname,1)
EXPIRE(keyname,10)
EXEC
PERFORM_API_CALL()
END
FUNCTION LIMIT_API_CALL(ip):
current = GET(ip)
IF current != NULL AND current > 10 THEN
ERROR "too many requests per second"
ELSE
value = INCR(ip)
IF value == 1 THEN
EXPIRE(ip,1)
END
PERFORM_API_CALL()
END
local current
current = redis.call("incr",KEYS[1])
if tonumber(current) == 1 then
redis.call("expire",KEYS[1],1)
end
FUNCTION LIMIT_API_CALL(ip)
current = LLEN(ip)
IF current > 10 THEN
ERROR "too many requests per second"
ELSE
IF EXISTS(ip) == FALSE #1
MULTI
RPUSH(ip,ip)
EXPIRE(ip,1)
EXEC
ELSE
RPUSHX(ip,ip)
END
PERFORM_API_CALL()
END
local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
last_tokens = capacity
end
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
if allowed then
new_tokens = filled_tokens - requested
end
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)
return { allowed, new_tokens }
其中每个元素的 key 和 value 可以是相同的,即请求的时间戳。
Sorted Set 可以根据时间窗口大小设置有效期,比如时间窗口为 1s 时设置过期时间 5s,在请求量不大时可以节省 Redis 服务器内存。
请求时间戳 t < 当前时间戳 now - 时间窗口大小 interval
long now = System.currentTimeMillis();
long maxScoreMs = now - windowInSecond * 1000;
Transaction redis = jedisPool.getResource().multi();
redis.zremrangeByScore(key, 0, maxScoreMs);
redis.zadd(key, now, now + "-" + Math.random()); // 加入一个随机值使得member不重复
redis.expire(key, windowInSecond);
redis.exec();
使用 Redis 事务 MULTI/EXEC。
使用 RedLock(https://redis.io/topics/distlock) 等分布式锁,要求每个客户端操作前先获取对应 key 的分布式锁。
Lua 脚本。
将可用配额的一部分,按一定比例(例如 50%),先预分配给集群内的机器。一般是平均分配,如果预先就已经知道每台机器的流量权重,可以加权分配。每台机器消耗配额的速率不同,中间也可能有机器宕机,可能有扩缩容,因此预分配的比例不宜太大,当然也不宜太小。
每台机器在配额耗尽时,向中心系统请求配额,这里的一个优化点是每台机器会记录自身配额消耗的速率(等同于承受的流量速率),按照速率大小申请不同大小的配额,消耗速率大则一次性申请更多。
在整体可用配额不足一定比例时(例如 10%),限制每台机器一次可申请的配额数,按剩余窗口大小计算发放配额的大小,并且每次发放量不超过剩余配额的一定比例(例如 50%),使得剩余的流量能够平滑地过渡。
Sentinel 是阿里巴巴开源的、面向分布式服务架构的流量控制组件。在发布的 Sentinel Go 0.3.0 版本中,支持熔断降级,可以针对 Go 服务中的不稳定调用进行自动熔断,避免出现级联错误/雪崩,是保障服务高可用重要的一环。
点击“阅读原文”了解流控降级最佳实践~