1. 限流 限流是保护高并发系统的三大利器之一,另外两个是缓存和降级。限流在很多场景中用来限制并发和请求量,比如说秒杀抢购,保护自身系统和下游系统不被巨型流量冲垮等。
限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务或进行流量整形。
1.1. 常用限流方法 常用的限流方式和场景有:限制总并发数(比如数据库连接池、线程池)、限制瞬时并发数(如nginx的limitconn模块,用来限制瞬时并发连接数,Java的Semaphore也可以实现)、限制时间窗口内的平均速率(如Guava的RateLimiter、nginx的limitreq模块,限制每秒的平均速率);其他还有如限制远程接口调用速率、限制MQ的消费速率。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。
RateLimiter常用:限制方法在一段时间内平均被调用次数不超过某个数字
2. 限流算法 2.1. 漏桶算法 形象解释: 入口->水流->桶(有漏洞)->水流->出口
正常情况:入口水流与出口水流持平。桶中几乎没有积水
超量情况:入口水流远大于出口水流。桶中不断积水,最后溢出
实现方法: 使用队列实现,有处理器按照固定速率从队列头取出请求进行计算。 请求到达如队列未满则直接放入队列。如请求量大导致队列满,则抛弃新请求
2.2. 令牌桶算法 形象解释: 令牌(固定速率)->桶 请求->缓冲器->能否拿到令牌?->计算
按照固定速率往桶里添加令牌。桶中存放的令牌数有最大上限,超出之后就被丢弃或者拒绝
当流量或者网络请求到达时,每个请求都要获取一个令牌,如果能够获取到,则直接处理,并且令牌桶删除一个令牌
如无法获取令牌,则选择丢弃或在缓冲区等待
3. RateLimiter Guava是Java领域优秀的开源项目,它包含了Google在Java项目中使用一些核心库。RateLimiter是其中的限流器实现。RateLimiter提供了令牌桶算法的两种具体实现。由RateLimiter的两个子类实现:
平滑突发限流 SmoothBursty :
平滑预热限流 SmoothWarmingUp :
注:本文源码分析基于:com.google.guava-18.0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 static RateLimiter create (SleepingStopwatch stopwatch, double permitsPerSecond) { RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 ); rateLimiter.setRate(permitsPerSecond); return rateLimiter; } static RateLimiter create ( SleepingStopwatch stopwatch, double permitsPerSecond, long warmupPeriod, TimeUnit unit, double coldFactor) { RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor); rateLimiter.setRate(permitsPerSecond); return rateLimiter; }
3.1. 平滑突发限流 使用: 使用 RateLimiter的静态方法创建一个限流器,设置每秒放置的令牌数为10个。返回的RateLimiter对象可以保证1秒内不会给超过10个令牌,并且以固定速率进行放置,达到平滑输出的效果。
注意:最终使用时计算出的时间间隔不保证一定是准确的0.1s。可能存在误差,在需要精确限流时需要注意
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void main (String[] args) { RateLimiter limiter = RateLimiter.create(10 ); for (int i = 0 ; i < 10 ; i++) { System.out.println("get 1 token, need: " + limiter.acquire() + "s" ); } }
3.2. 平滑预热限流 SmoothWarmingUp是带有预热期的平滑限流,它启动后有一段预热期,逐步将分发频率提升到配置的速率。 如下代码中分发速率是2,预热期是5秒。限流器会在前5秒内限制速度,最后再恢复到每秒2次的分发速率
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static void main (String[] args) { RateLimiter limiter = RateLimiter.create(2 , 5 , TimeUnit.SECONDS); for (int i = 0 ; i < 10 ; i++) { System.out.println("get 1 token, need: " + limiter.acquire() + "s" ); } }
3.3. 使用方法
使用acquire
每次调用doSomeThing
时,程序都会在limiter.acquier()
处阻塞等待。直到limiter获取token,才会继续往后执行do something。从而达到限流效果
1 2 3 4 5 6 7 8 9 public class limiter { RateLimiter limiter = RateLimiter.create(10 ); public void doSomeThing () { limiter.acquire(); } }
使用tryAcquire
每次调用doSomeThing
时,程序都会在limiter.tryAcquier()
等待。与上面不同的时,这个阻塞式等待设置有最长等待时间。如成功在500ms内获取token,则返回true可以继续执行逻辑。如果未能在500ms内获取token,则表示获取token失败,事实上已经超过了qps限制,需要执行失败逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class limiter { RateLimiter limiter = RateLimiter.create(10 ); public void doSomething () { if (limiter.tryAcquire(500 , TimeUnit.MILLISECONDS)){ }else { } } }
4. 源码解析 4.1. 构造函数 创建stopWatch,然后调用create,传入permitsPerSecond
创建子类的SmoothBursty
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond); static RateLimiter create (SleepingStopwatch stopwatch, double permitsPerSecond) { RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 ); rateLimiter.setRate(permitsPerSecond); return rateLimiter; } public final void setRate (double permitsPerSecond) { checkArgument( permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive" ); synchronized (mutex()) { doSetRate(permitsPerSecond, stopwatch.readMicros()); } }
4.2. 时间相关 先来看看代码中时间相关的基本计算方法
stopWatch源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public final class Stopwatch { private final Ticker ticker; private boolean isRunning; private long elapsedNanos; private long startTick; Stopwatch() { this (Ticker.systemTicker()); } public Stopwatch start () { checkState(!isRunning, "This stopwatch is already running." ); isRunning = true ; startTick = ticker.read(); return this ; } static final SleepingStopwatch createFromSystemTimer () { return new SleepingStopwatch() { final Stopwatch stopwatch = Stopwatch.createStarted(); @Override long readMicros () { return stopwatch.elapsed(MICROSECONDS); } @Override void sleepMicrosUninterruptibly (long micros) { if (micros > 0 ) { Uninterruptibles.sleepUninterruptibly(micros, MICROSECONDS); } } };
其中ticker.read()调用系统的ticker并记录
1 2 3 4 5 6 7 8 9 public static Ticker systemTicker () { return SYSTEM_TICKER; } private static final Ticker SYSTEM_TICKER = new Ticker() { @Override public long read () { return Platform.systemNanoTime(); } };
我们可以明白,rateLimiter的时间计算本质上基于系统的ticker计算。由于系统的tick计算是基于系统所在的计算平台的CPU频率与时钟频率,如不能精确计算CPU时钟频率则无法实现精密时间计算。因此RateLimiter在计算高精度的时间时可能出现误差。一个具体表现就是上面例子中预估0.1s获取一次时,经常出现误差,无法精确做到0.1s一次。(其实这个问题还有另一种解释,可以继续往下看)
4.3. SmoothBursty 我们继续查看SmoothBursty类的源码
1 2 3 4 5 6 7 8 9 10 11 static final class SmoothBursty extends SmoothRateLimiter { final double maxBurstSeconds; SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) { super (stopwatch); this .maxBurstSeconds = maxBurstSeconds; } }
doSetRate流程:
先计算double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
然后设置SmoothBursty中的maxPermits
(默认为1秒的值)为permitsPerSecond
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 doSetRate(permitsPerSecond, stopwatch.readMicros()); final void doSetRate (double permitsPerSecond, long nowMicros) { resync(nowMicros); double stableIntervalMicros = SECONDS.toMicros(1L ) / permitsPerSecond; this .stableIntervalMicros = stableIntervalMicros; doSetRate(permitsPerSecond, stableIntervalMicros); } @Override void doSetRate (double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this .maxPermits; maxPermits = maxBurstSeconds * permitsPerSecond; if (oldMaxPermits == Double.POSITIVE_INFINITY) { storedPermits = maxPermits; } else { storedPermits = (oldMaxPermits == 0.0 ) ? 0.0 : storedPermits * maxPermits / oldMaxPermits; } }
4.4. acquire流程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 public double acquire (int permits) { long microsToWait = reserve(permits); stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L ); } final long reserve (int permits) { checkPermits(permits); synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } final long reserveAndGetWaitLength (int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0 ); } final long reserveEarliestAvailable (int requiredPermits, long nowMicros) { resync(nowMicros); long returnValue = nextFreeTicketMicros; double storedPermitsToSpend = min(requiredPermits, this .storedPermits); double freshPermits = requiredPermits - storedPermitsToSpend; long waitMicros = storedPermitsToWaitTime(this .storedPermits, storedPermitsToSpend) + (long ) (freshPermits * stableIntervalMicros); this .nextFreeTicketMicros = nextFreeTicketMicros + waitMicros; this .storedPermits -= storedPermitsToSpend; return returnValue; } void resync (long nowMicros) { if (nowMicros > nextFreeTicketMicros) { storedPermits = min(maxPermits, storedPermits+ (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros()); nextFreeTicketMicros = nowMicros; } } double coolDownIntervalMicros () { return stableIntervalMicros; }
流程:
尝试获取令牌
调用reserveEarliestAvailable
调用resync,同步时间与令牌
计算storedPermitsToSpend
计算是否需要预先支付令牌
计算waitMicros
更新nextFreeTicketMicros
减少storedPermits
让stopWatch等待nextFreeTicketMicros
返回,表示acquire成功
注意点:
RateLimiter在没有足够令牌发放时,采用滞后处理的方式。前一个请求获取令牌所需等待的时间计算waitMicros由下一次请求来承受,也就是代替前一个请求进行等待。
RateLimiter拥有预支令牌机制。当请求的令牌数量超过设置的qps上限后,可以预支后几秒的令牌放行当前动作。后果是后几秒的令牌获取受阻
5. 总结
RateLimiter可以用于单机qps限制,并且带有令牌预支付功能。使用起来简单方便。
缺点:无法实现qps的精确计算,仅能限制到一定时间内的平均qps。
仅适用于单机qps。如需用于集群的qps限制,可能需要将总qps除以集群机器数量,得到单机qps上限后再设置。