本文内容

  1. 介绍常见的限流算法
  2. 通过控制最大并发数来进行限流
  3. 通过漏桶算法来进行限流
  4. 通过令牌桶算法来进行限流
  5. 限流工具类 RateLimiter

    常见的限流的场景

  6. 秒杀活动,数量有限,访问量巨大,为了防止系统宕机,需要做限流处理

  7. 国庆期间,一般的旅游景点人口太多,采用排队方式做限流处理
  8. 医院看病通过发放排队号的方式来做限流处理。

    常见的限流算法

  9. 通过控制最大并发数来进行限流

  10. 使用漏桶算法来进行限流
  11. 使用令牌桶算法来进行限流

    通过控制最大并发数来进行限流

    以秒杀业务为例,10 个 iphone,100 万人抢购,100 万人同时发起请求,最终能够抢到的人也就是前面几个人,后面的基本上都没有希望了,那么我们可以通过控制并发数来实现,比如并发数控制在 10 个,其他超过并发数的请求全部拒绝,提示:秒杀失败,请稍后重试。
    并发控制的,通俗解释:一大波人去商场购物,必须经过一个门口,门口有个门卫,兜里面有指定数量的门禁卡,来的人先去门卫那边拿取门禁卡,拿到卡的人才可以刷卡进入商场,拿不到的可以继续等待。进去的人出来之后会把卡归还给门卫,门卫可以把归还来的卡继续发放给其他排队的顾客使用。
    JUC 中提供了这样的工具类:Semaphore,示例代码:
    package com.itsoku.chat29;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/
微信公众号:程序员路人,个人博客:http://www.itsoku.com
/
public class Demo1** {

  1. **static** Semaphore semaphore = **new** Semaphore(5);
  2. **public** **static** **void** **main**(String[] args) {<br /> **for** (**int** i = 0; i < 20; i++) {<br /> **new** Thread(() -> {<br /> **boolean** flag = **false**;<br /> **try** {<br /> flag = semaphore.tryAcquire(100, TimeUnit.MICROSECONDS);<br /> **if** (flag) {<br /> //休眠2秒,模拟下单操作<br /> System.out.println(Thread.currentThread() + ",尝试下单中。。。。。");<br /> TimeUnit.SECONDS.sleep(2);<br /> } **else** {<br /> System.out.println(Thread.currentThread() + ",秒杀失败,请稍微重试!");<br /> }<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> } **finally** {<br /> **if** (flag) {<br /> semaphore.release();<br /> }<br /> }<br /> }).start();<br /> }<br /> }

}

输出:
Thread[Thread-10,5,main],尝试下单中。。。。。
Thread[Thread-8,5,main],尝试下单中。。。。。
Thread[Thread-9,5,main],尝试下单中。。。。。
Thread[Thread-12,5,main],尝试下单中。。。。。
Thread[Thread-11,5,main],尝试下单中。。。。。
Thread[Thread-2,5,main],秒杀失败,请稍微重试!
Thread[Thread-1,5,main],秒杀失败,请稍微重试!
Thread[Thread-18,5,main],秒杀失败,请稍微重试!
Thread[Thread-16,5,main],秒杀失败,请稍微重试!
Thread[Thread-0,5,main],秒杀失败,请稍微重试!
Thread[Thread-3,5,main],秒杀失败,请稍微重试!
Thread[Thread-14,5,main],秒杀失败,请稍微重试!
Thread[Thread-6,5,main],秒杀失败,请稍微重试!
Thread[Thread-13,5,main],秒杀失败,请稍微重试!
Thread[Thread-17,5,main],秒杀失败,请稍微重试!
Thread[Thread-7,5,main],秒杀失败,请稍微重试!
Thread[Thread-19,5,main],秒杀失败,请稍微重试!
Thread[Thread-15,5,main],秒杀失败,请稍微重试!
Thread[Thread-4,5,main],秒杀失败,请稍微重试!
Thread[Thread-5,5,main],秒杀失败,请稍微重试!

关于Semaphore的使用,可以移步:Semaphore(信号量)

使用漏桶算法来进行限流

国庆期间比较火爆的景点,人流量巨大,一般入口处会有限流的弯道,让游客进去进行排队,排在前面的人,每隔一段时间会放一拨进入景区。排队人数超过了指定的限制,后面再来的人会被告知今天已经游客量已经达到峰值,会被拒绝排队,让其明天或者以后再来,这种玩法采用漏桶限流的方式。
漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。
漏桶算法示意图:
image.png
简陋版的实现,代码如下:
package com.itsoku.chat29;

import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

/
微信公众号:程序员路人,个人博客:http://www.itsoku.com
/
public class Demo2** {

  1. **public** **static** **class** **BucketLimit** {<br /> **static** AtomicInteger threadNum = **new** AtomicInteger(1);<br /> //容量<br /> **private** **int** capcity;<br /> //流速<br /> **private** **int** flowRate;<br /> //流速时间单位<br /> **private** TimeUnit flowRateUnit;<br /> **private** BlockingQueue<Node> queue;<br /> //漏桶流出的任务时间间隔(纳秒)<br /> **private** **long** flowRateNanosTime;
  2. **public** **BucketLimit**(**int** capcity, **int** flowRate, TimeUnit flowRateUnit) {<br /> **this**.capcity = capcity;<br /> **this**.flowRate = flowRate;<br /> **this**.flowRateUnit = flowRateUnit;<br /> **this**.bucketThreadWork();<br /> }
  3. //漏桶线程<br /> **public** **void** **bucketThreadWork**() {<br /> **this**.queue = **new** ArrayBlockingQueue<Node>(capcity);<br /> //漏桶流出的任务时间间隔(纳秒)<br /> **this**.flowRateNanosTime = flowRateUnit.toNanos(1) / flowRate;<br /> Thread thread = **new** Thread(**this**::bucketWork);<br /> thread.setName("漏桶线程-" + threadNum.getAndIncrement());<br /> thread.start();<br /> }
  4. //漏桶线程开始工作<br /> **public** **void** **bucketWork**() {<br /> **while** (**true**) {<br /> Node node = **this**.queue.poll();<br /> **if** (Objects.nonNull(node)) {<br /> //唤醒任务线程<br /> LockSupport.unpark(node.thread);<br /> }<br /> //休眠flowRateNanosTime<br /> LockSupport.parkNanos(**this**.flowRateNanosTime);<br /> }<br /> }
  5. //返回一个漏桶<br /> **public** **static** BucketLimit **build**(**int** capcity, **int** flowRate, TimeUnit flowRateUnit) {<br /> **if** (capcity < 0 || flowRate < 0) {<br /> **throw** **new** IllegalArgumentException("capcity、flowRate必须大于0!");<br /> }<br /> **return** **new** BucketLimit(capcity, flowRate, flowRateUnit);<br /> }
  6. //当前线程加入漏桶,返回false,表示漏桶已满;true:表示被漏桶限流成功,可以继续处理任务<br /> **public** **boolean** **acquire**() {<br /> Thread thread = Thread.currentThread();<br /> Node node = **new** Node(thread);<br /> **if** (**this**.queue.offer(node)) {<br /> LockSupport.park();<br /> **return** **true**;<br /> }<br /> **return** **false**;<br /> }
  7. //漏桶中存放的元素<br /> **class** **Node** {<br /> **private** Thread thread;
  8. **public** **Node**(Thread thread) {<br /> **this**.thread = thread;<br /> }<br /> }<br /> }
  9. **public** **static** **void** **main**(String[] args) {<br /> BucketLimit bucketLimit = BucketLimit.build(10, 60, TimeUnit.MINUTES);<br /> **for** (**int** i = 0; i < 15; i++) {<br /> **new** Thread(() -> {<br /> **boolean** acquire = bucketLimit.acquire();<br /> System.out.println(System.currentTimeMillis() + " " + acquire);<br /> **try** {<br /> TimeUnit.SECONDS.sleep(1);<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> }).start();<br /> }<br /> }

}

代码中BucketLimit.build(10, 60, TimeUnit.MINUTES);创建了一个容量为 10,流水为 60/分钟的漏桶。
代码中用到的技术有:

  1. 详解各种阻塞队列
  2. LockSupport 工具类

    使用令牌桶算法来进行限流

    令牌桶算法的原理是系统以恒定的速率产生令牌,然后把令牌放到令牌桶中,令牌桶有一个容量,当令牌桶满了的时候,再向其中放令牌,那么多余的令牌会被丢弃;当想要处理一个请求的时候,需要从令牌桶中取出一个令牌,如果此时令牌桶中没有令牌,那么则拒绝该请求。从原理上看,令牌桶算法和漏桶算法是相反的,一个“进水”,一个是“漏水”。这种算法可以应对突发程度的请求,因此比漏桶算法好。
    令牌桶算法示意图:
    image.png
    有兴趣的可以自己去实现一个。

    限流工具类 RateLimiter

    Google 开源工具包 Guava 提供了限流工具类 RateLimiter,可以非常方便的控制系统每秒吞吐量,示例代码如下:
    package com.itsoku.chat29;

import com.google.common.util.concurrent.RateLimiter;

import java.util.Calendar;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

/
微信公众号:程序员路人,个人博客:http://www.itsoku.com
/
public class Demo3** {

  1. **public** **static** **void** **main**(String[] args) **throws** InterruptedException {<br /> RateLimiter rateLimiter = RateLimiter.create(5);//设置QPS为5<br /> **for** (**int** i = 0; i < 10; i++) {<br /> rateLimiter.acquire();<br /> System.out.println(System.currentTimeMillis());<br /> }<br /> System.out.println("----------");<br /> //可以随时调整速率,我们将qps调整为10<br /> rateLimiter.setRate(10);<br /> **for** (**int** i = 0; i < 10; i++) {<br /> rateLimiter.acquire();<br /> System.out.println(System.currentTimeMillis());<br /> }<br /> }<br />}

输出:
1566284028725
1566284028922
1566284029121
1566284029322
1566284029522
1566284029721
1566284029921
1566284030122
1566284030322
1566284030522
—————
1566284030722
1566284030822
1566284030921
1566284031022
1566284031121
1566284031221
1566284031321
1566284031422
1566284031522
1566284031622

代码中RateLimiter.create(5)创建 QPS 为 5 的限流对象,后面又调用rateLimiter.setRate(10);将速率设为 10,输出中分 2 段,第一段每次输出相隔 200 毫秒,第二段每次输出相隔 100 毫秒,可以非常精准的控制系统的 QPS。
上面介绍的这些,业务中可能会用到,也可以用来应对面试。

漏桶和令牌桶的区别

  • 漏桶算法是控制流出速度的
  • 令牌桶算法是控制流入速度的