分享

探索常见的几种限流策略和实现

 xkl135 2018-07-29

高并发访问时,缓存、限流、降级往往是系统的利剑,在互联网蓬勃发展的时期,经常会面临因用户暴涨导致的请求不可用的情况,甚至引发连锁反映导致整个系统崩溃。这个时候常见的解决方案之一就是限流了,当请求达到一定的并发数或速率,就进行等待、排队、降级、拒绝服务等...

限流算法

常见的算法有 漏桶(leaky bucket)、 令牌桶(TokenBucket)、 计数器,本章通过最简单的代码和最直白的文字描述三种的实现方式(基于本地而不是分布式)...

令牌桶算法

令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。 当桶满时,新添加的令牌被丢弃或拒绝。

如图所示

在 GoogleGuava中提供了一个RateLimiter 工具类,就是基于 令牌桶算法实现平滑突发的限流策略,令牌桶的好处是可以方便的改变速度. 一旦需要提高速率,则按需提高放入桶中的令牌的速率. 一般会定时(比如1000毫秒)往桶中增加一定数量的令牌, 有些变种算法则可以实时的计算应该增加的令牌的数量

在示例代码中为每秒中产生 2 个令牌,意味着每500毫秒会产生一个令牌。

limiter.acquire(num) 表示消费多少个令牌。当桶中有足够的令牌时,则直接返回0,否则阻塞,直到有可用的令牌数才返回,返回的值为阻塞的时间。

示例代码

  1. package com.battcn.limiting;

  2. import com.google.common.util.concurrent.RateLimiter;

  3. import org.junit.Test;

  4. import java.time.LocalDateTime;

  5. import java.util.concurrent.ExecutorService;

  6. import java.util.concurrent.Executors;

  7. import java.util.concurrent.TimeUnit;

  8. /**

  9. * @author Levin

  10. * @since 2018/7/24 0024

  11. */

  12. public class RateLimiterTest {

  13.    /**

  14.     * 令牌桶算法

  15.     * 每秒生成 2 個令牌

  16.     */

  17.    private static final RateLimiter limiter = RateLimiter.create(2);

  18.    private void rateLimiter() {

  19.        // 默认就是 1

  20.        final double acquire = limiter.acquire(1);

  21.        System.out.println('当前时间 - ' LocalDateTime.now() ' - ' Thread.currentThread().getName() ' - 阻塞 - ' acquire ' 通过...');

  22.    }

  23.    @Test

  24.    public void testDemo1() throws InterruptedException {

  25.        final ExecutorService service = Executors.newFixedThreadPool(5);

  26.        for (int i = 0; i < 5; i ) {

  27.            service.execute(this::rateLimiter);

  28.        }

  29.        TimeUnit.SECONDS.sleep(5);

  30.    }

  31. }

运行日志

通过日志可以很直观的看出每个令牌产生时间间隔大约在 500 毫秒左右

  1. 当前时间 - 2018-07-24T15:10:25.091 - pool-1-thread-2 - 阻塞 - 0.0 通过...

  2. 当前时间 - 2018-07-24T15:10:25.531 - pool-1-thread-5 - 阻塞 - 0.453466 通过...

  3. 当前时间 - 2018-07-24T15:10:26.031 - pool-1-thread-3 - 阻塞 - 0.953454 通过...

  4. 当前时间 - 2018-07-24T15:10:26.531 - pool-1-thread-4 - 阻塞 - 1.453354 通过...

  5. 当前时间 - 2018-07-24T15:10:27.031 - pool-1-thread-1 - 阻塞 - 1.952553 通过...

漏桶算法

其主要目的是控制数据注入到网络的速率,平滑网络上的突发流量,数据可以以任意速度流入到漏桶中。 漏桶算法提供了一种机制,通过它,突发流量可以被整形以便为网络提供一个稳定的流量。 漏桶可以看作是一个带有常量服务时间的单服务器队列,如果漏桶为空,则不需要流出水滴,如果漏桶(包缓存)溢出,那么水滴会被溢出丢弃。

如图所示

漏铜算法可以通过 信号量(Semaphore 的方式实现,很好的达到消峰的目的,如果下文中的代码,队列中任务存活个数就如同是水桶最多能盛装的水量,当超出这个阀值就会丢弃任务....

Semaphore 是 JDK1.5 提供用于限制获取某种资源的线程数量,拥有有 公平、非公平 两种模式。公平则是顺序获取信号,遵循(FIFO)先进先出,而非公平模式则是凭本事抢资源,想先进先出?不存在的。默认是非公平的

示例代码

  1. package com.battcn.limiting;

  2. import org.junit.Test;

  3. import java.time.LocalDateTime;

  4. import java.util.concurrent.ExecutorService;

  5. import java.util.concurrent.Executors;

  6. import java.util.concurrent.Semaphore;

  7. import java.util.concurrent.TimeUnit;

  8. /**

  9. * @author Levin

  10. * @since 2018/7/23 0023

  11. */

  12. public class SemaphoreLimiterTest {

  13.    /**

  14.     * 计数器限流算法(允许将任务放入到缓冲队列)

  15.     * 信号量,用来达到削峰的目的

  16.     */

  17.    private static final Semaphore semaphore = new Semaphore(3);

  18.    private void semaphoreLimiter() {

  19.        // 队列中允许存活的任务个数不能超过 5 个

  20.        if (semaphore.getQueueLength() > 5) {

  21.            System.out.println(LocalDateTime.now() ' - ' Thread.currentThread().getName() ' - 拒絕...');

  22.        } else {

  23.            try {

  24.                semaphore.acquire();

  25.                System.out.println(LocalDateTime.now() ' - ' Thread.currentThread().getName() ' - 通过...');

  26.                //处理核心逻辑

  27.                TimeUnit.SECONDS.sleep(2);

  28.            } catch (InterruptedException e) {

  29.                e.printStackTrace();

  30.            } finally {

  31.                semaphore.release();

  32.            }

  33.        }

  34.    }

  35.    @Test

  36.    public void testSemaphore() throws InterruptedException {

  37.        final ExecutorService service = Executors.newFixedThreadPool(10);

  38.        for (int i = 0; i < 10; i ) {

  39.            service.execute(this::semaphoreLimiter);

  40.        }

  41.        TimeUnit.SECONDS.sleep(5);

  42.    }

  43. }

运行日志

一共 10 个线程同时请求,初始信号量为3,表示最多可以同时处理 3 个任务,超出进入缓冲区排队等待,当缓冲区满了后则拒绝接收新的请求...

  1. 2018-07-24T15:17:00.236 - pool-1-thread-3 - 通过...

  2. 2018-07-24T15:17:00.237 - pool-1-thread-2 - 通过...

  3. 2018-07-24T15:17:00.236 - pool-1-thread-10 - 拒絕...

  4. 2018-07-24T15:17:00.237 - pool-1-thread-1 - 通过...

  5. 2018-07-24T15:17:02.237 - pool-1-thread-4 - 通过...

  6. 2018-07-24T15:17:02.237 - pool-1-thread-5 - 通过...

  7. 2018-07-24T15:17:02.237 - pool-1-thread-6 - 通过...

  8. 2018-07-24T15:17:04.238 - pool-1-thread-9 - 通过...

  9. 2018-07-24T15:17:04.238 - pool-1-thread-7 - 通过...

  10. 2018-07-24T15:17:04.238 - pool-1-thread-8 - 通过...

计数器限流

计数器限流算法是比较常用一种的限流方案也是最为粗暴直接的,主要用来限制总并发数,比如数据库连接池大小、线程池大小、接口访问并发数等都是使用计数器算法...

使用 AomicInteger 来进行统计当前正在并发执行的次数,如果超过域值就直接拒绝请求,提示系统繁忙

AtomicInteger 是 JDK1.5 提供的拥有原子特性的计数功能,都知道在多线程环境下 num 是非原子的,但是有了 AtomicInteger 后这个问题可以非常简单的解决,它就像是 redis incr

示例代码

  1. package com.battcn.limiting;

  2. import org.junit.Test;

  3. import java.time.LocalDateTime;

  4. import java.util.concurrent.ExecutorService;

  5. import java.util.concurrent.Executors;

  6. import java.util.concurrent.TimeUnit;

  7. import java.util.concurrent.atomic.AtomicInteger;

  8. /**

  9. * @author Levin

  10. * @since 2018/7/24 0024

  11. */

  12. public class AtomicLimiterTest {

  13.    /**

  14.     * 计数器限流算法(比较暴力/超出直接拒绝)

  15.     * Atomic,限制总数

  16.     */

  17.    private static final AtomicInteger atomic = new AtomicInteger(0);

  18.    private void atomicLimiter() {

  19.        // 最大支持 3 個

  20.        if (atomic.get() >= 3) {

  21.            System.out.println(LocalDateTime.now() ' - ' Thread.currentThread().getName() ' - ' '拒絕...');

  22.        } else {

  23.            try {

  24.                atomic.incrementAndGet();

  25.                //处理核心逻辑

  26.                System.out.println(LocalDateTime.now() ' - ' Thread.currentThread().getName() ' - ' '通过...');

  27.                TimeUnit.SECONDS.sleep(1);

  28.            } catch (InterruptedException e) {

  29.                e.printStackTrace();

  30.            } finally {

  31.                atomic.decrementAndGet();

  32.            }

  33.        }

  34.    }

  35.    @Test

  36.    public void testAtomic() throws InterruptedException {

  37.        final ExecutorService service = Executors.newFixedThreadPool(5);

  38.        for (int i = 0; i < 5; i ) {

  39.            service.execute(this::atomicLimiter);

  40.        }

  41.        TimeUnit.SECONDS.sleep(5);

  42.    }

  43. }

运行日志

  1. 2018-07-24T15:31:40.270 - pool-1-thread-2 - 通过...

  2. 2018-07-24T15:31:40.271 - pool-1-thread-4 - 拒絕...

  3. 2018-07-24T15:31:40.271 - pool-1-thread-1 - 通过...

  4. 2018-07-24T15:31:40.271 - pool-1-thread-3 - 通过...

  5. 2018-07-24T15:31:40.271 - pool-1-thread-5 - 拒絕...

总结

本篇文章简短的介绍了几种限流方案,虽然是针对本地限流的方案而不是分布式的,但学习本来就是循环渐进的,由浅入深....

说点什么

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多