分享

Springboot应用中线程池配置教程(2021版)...

 极风狼 2022-01-17

前言:日常开发中我们常用ThreadPoolExecutor提供的线程池服务帮我们管理线程,在Springboot中更是提供了@Async注解来简化业务逻辑提交到线程池中执行的过程。由于Springboot中默认设置的corePoolSize=1和queyeCapacity=Integer.MAX_VALUE,相当于采用单线程处理所有任务,这就与多线程的目的背道而驰,所以这就要求我们在使用@Async注解时要配置线程池。本文就讲述下Springboot应用下的线程池配置。

背景知识:Springboot中通过使用ThreadPoolTaskExecutor这个JavaBean对象的corePoolSize(核心线程数)、maxPoolSize(最大线程数)、keepAliveSeconds(线程空闲时长)和queueCapacity(任务队列容量)属性来配置ThreadPoolExecutor,以上四个属性的作用大致如下:

新提交一个任务时的处理流程很明显:

  1. 如果当前线程池的线程数还没有达到基本大小(poolSize < corePoolSize),无论是否有空闲的线程新增一个线程处理新提交的任务;
  2. 如果当前线程池的线程数大于或等于基本大小(poolSize >= corePoolSize) 且任务队列未满时,就将新提交的任务提交到阻塞队列排队,等候处理workQueue.offer(command);
  3. 如果当前线程池的线程数大于或等于基本大小(poolSize >= corePoolSize) 且任务队列满时
    1. 当前poolSize<maximumPoolSize,那么就新增线程来处理任务;
    2. 当前poolSize=maximumPoolSize,那么意味着线程池的处理能力已经达到了极限,此时需要拒绝新增加的任务。至于如何拒绝处理新增的任务,取决于线程池的饱和策略RejectedExecutionHandler。

好了,回到正文。目前配置Springboot线程池主要有两种方式:配置默认线程池和提供自定义线程池;毫无疑问,两种配置方式并无优劣。从使用角度来讲,由于自定义线程池是自定义即没有被Springboot默认使用的线程池,那么就需要通过@Async("自定义线程池bean对象名")的方式去使用,其它地方同默认线程池使用方式一致;下面通过一个简单的Springboot应用结合实际来展示:

1、新建一个Springboot项目,项目结构和pom.xml内容如下:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven./POM/4.0.0" xmlns:xsi="http://www./2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven./POM/4.0.0 https://maven./xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.1.17.RELEASE</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.hugesoft</groupId>
  12. <artifactId>springboot-async</artifactId>
  13. <version>0.0.1</version>
  14. <name>springboot-async</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-web</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.projectlombok</groupId>
  26. <artifactId>lombok</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.springframework.boot</groupId>
  30. <artifactId>spring-boot-starter-test</artifactId>
  31. <scope>test</scope>
  32. </dependency>
  33. </dependencies>

  34. <build>
  35. <plugins>
  36. <plugin>
  37. <groupId>org.springframework.boot</groupId>
  38. <artifactId>spring-boot-maven-plugin</artifactId>
  39. </plugin>
  40. </plugins>
  41. </build>

  42. </project>

二、application.yml中,自定义了线程池需要配置的四个属性,内容如下:

  1. task:
  2. pool:
  3. corePoolSize: 10
  4. maxPoolSize: 20
  5. keepAliveSeconds: 300
  6. queueCapacity: 50

三、在com.hugesoft.config包中有三个类:TaskThreadPoolConfig类用来简化封装application.yml配置的属性,OverrideDefaultThreadPoolConfig类提供了配置默认线程池的方式,CustomizeThreadPoolConfig类则实现了自定义线程池,具体实现如下:

  1. package com.hugesoft.config.dto;

  2. import lombok.Data;
  3. import org.springframework.boot.context.properties.ConfigurationProperties;
  4. import org.springframework.stereotype.Component;

  5. /**
  6. * 线程池配置属性类
  7. * @author YuXD
  8. */
  9. @Data
  10. @Component
  11. @ConfigurationProperties(prefix = "task.pool")
  12. public class TaskThreadPoolConfig {

  13. /**
  14. * 核心线程数
  15. */
  16. private int corePoolSize;

  17. /**
  18. * 最大线程数
  19. */
  20. private int maxPoolSize;

  21. /**
  22. * 线程空闲时间
  23. */
  24. private int keepAliveSeconds;

  25. /**
  26. * 任务队列容量(阻塞队列)
  27. */
  28. private int queueCapacity;
  29. }
  1. package com.hugesoft.config;

  2. import com.hugesoft.config.dto.TaskThreadPoolConfig;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.scheduling.annotation.AsyncConfigurer;
  8. import org.springframework.scheduling.annotation.EnableAsync;
  9. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

  10. import java.util.concurrent.Executor;
  11. import java.util.concurrent.ThreadPoolExecutor;

  12. /**
  13. * 重写默认线程池配置
  14. * @author YuXD
  15. */
  16. @Slf4j
  17. @Configuration
  18. @EnableAsync
  19. public class OverrideDefaultThreadPoolConfig implements AsyncConfigurer {

  20. @Autowired
  21. private TaskThreadPoolConfig config;

  22. @Override
  23. public Executor getAsyncExecutor() {
  24. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  25. //核心线程池大小
  26. executor.setCorePoolSize(config.getCorePoolSize());
  27. //最大线程数
  28. executor.setMaxPoolSize(config.getMaxPoolSize());
  29. //队列容量
  30. executor.setQueueCapacity(config.getQueueCapacity());
  31. //活跃时间
  32. executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
  33. //线程名字前缀
  34. executor.setThreadNamePrefix("default-thread-");
  35. /*
  36. 当poolSize已达到maxPoolSize,如何处理新任务(是拒绝还是交由其它线程处理)
  37. CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
  38. */
  39. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  40. executor.initialize();
  41. return executor;
  42. }

  43. /**
  44. * 异步任务中异常处理
  45. *
  46. * @return
  47. */
  48. @Override
  49. public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
  50. return (ex, method, params) -> {
  51. log.error("==========================" + ex.getMessage() + "=======================", ex);
  52. log.error("exception method:" + method.getName());
  53. };
  54. }
  55. }

  1. package com.hugesoft.config;

  2. import com.hugesoft.config.dto.TaskThreadPoolConfig;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.scheduling.annotation.EnableAsync;
  7. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

  8. import java.util.concurrent.Executor;
  9. import java.util.concurrent.ThreadPoolExecutor;

  10. /**
  11. *
  12. * 自定义下城
  13. * @author : YuXD
  14. */
  15. @Configuration
  16. @EnableAsync
  17. public class CustomizeThreadPoolConfig {

  18. @Autowired
  19. private TaskThreadPoolConfig config;

  20. @Bean("customizeThreadPool")
  21. public Executor doConfigCustomizeThreadPool() {
  22. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  23. //核心线程池大小
  24. executor.setCorePoolSize(config.getCorePoolSize());
  25. //最大线程数
  26. executor.setMaxPoolSize(config.getMaxPoolSize());
  27. //队列容量
  28. executor.setQueueCapacity(config.getQueueCapacity());
  29. //活跃时间
  30. executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
  31. //线程名字前缀
  32. executor.setThreadNamePrefix("customize-thread-");
  33. /*
  34. 当poolSize已达到maxPoolSize,如何处理新任务(是拒绝还是交由其它线程处理)
  35. CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
  36. */
  37. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  38. executor.initialize();
  39. return executor;
  40. }

  41. }

四、com.hugesoft.service包下的内容是从真实项目环境中提取出来的,IStatusAnalyseService定义了设备状态分析基础Service,JJDeviceDataAnalyseManager,ZHUDeviceDataAnalyseManager,ZZDeviceDataAnalyseManager三个子类分别提供了默认实现,AbstractDeviceDataAnalyseManager提取了三个子类用到的公共方法,代码没难度,理解即可;需要尤其注意AbstractDeviceDataAnalyseManager的两个重载方法,分别采用默认线程池和自定义线程池的方式,注意使用的异同点,这点也就是默认线程池和自定义线程池适用上的唯一不同点。具体试下如下:

  1. package com.hugesoft.service;

  2. /**
  3. * 参数分析基础Service,所有需要进行参数分析的都需要实现该接口
  4. *
  5. * @author YuXD
  6. */
  7. public interface IStatusAnalyseService {

  8. /**
  9. * 设备状态解析处理
  10. *
  11. * @param start 开始时间
  12. * @param end 截止时间
  13. */
  14. void doStatusAnalyseHandle(String start, String end);

  15. /**
  16. * 设备状态解析处理
  17. *
  18. * @param end 截止时间
  19. */
  20. void doStatusAnalyseHandle(String end);

  21. /**
  22. * 获取数据类别
  23. *
  24. * @return
  25. */
  26. String getDataType();

  27. }
  1. package com.hugesoft.service.impl;


  2. import com.hugesoft.service.IStatusAnalyseService;
  3. import org.springframework.scheduling.annotation.Async;

  4. import java.util.Random;

  5. /**
  6. * 抽象的设备数据分析Manager
  7. *
  8. * @author YuXD
  9. * @since 2020-06-18 22:47
  10. */
  11. public abstract class AbstractDeviceDataAnalyseManager implements IStatusAnalyseService {

  12. @Async("customizeThreadPool")
  13. @Override
  14. public void doStatusAnalyseHandle(String start, String end) {
  15. int sleepSeconds = new Random().nextInt(3) + 1;
  16. try {
  17. Thread.sleep(sleepSeconds * 1000);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. System.out.println(getDataType() + "在自定义线程" + Thread.currentThread().getName() + "执行了" + sleepSeconds + "秒");
  22. }

  23. @Async
  24. @Override
  25. public void doStatusAnalyseHandle(String end) {
  26. int sleepSeconds = new Random().nextInt(3) + 1;
  27. try {
  28. Thread.sleep(sleepSeconds * 1000);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. System.out.println(getDataType() + "在默认线程" + Thread.currentThread().getName() + "执行了" + sleepSeconds + "秒");
  33. }
  34. }
  1. package com.hugesoft.service.impl;

  2. import org.springframework.stereotype.Service;

  3. /**
  4. * @description: 机加设备数据分析Service实现类
  5. * @author: YuXD
  6. * @create: 2021-03-15 20:17
  7. **/
  8. @Service("JJ")
  9. public class JJDeviceDataAnalyseManager extends AbstractDeviceDataAnalyseManager {

  10. @Override
  11. public String getDataType() {
  12. return "机加";
  13. }

  14. }
  1. package com.hugesoft.service.impl;

  2. import org.springframework.stereotype.Service;

  3. /**
  4. * @description: 铸造设备数据分析Service实现类
  5. * @author: YuXD
  6. * @create: 2020-06-18 22:56
  7. **/
  8. @Service("ZHU")
  9. public class ZHUDeviceDataAnalyseManager extends AbstractDeviceDataAnalyseManager {

  10. @Override
  11. public String getDataType() {
  12. return "铸造";
  13. }

  14. }
  1. package com.hugesoft.service.impl;

  2. import org.springframework.stereotype.Service;

  3. /**
  4. * @description: 总装设备数据分析Service实现类
  5. * @author: YuXD
  6. * @create: 2020-06-18 22:56
  7. **/
  8. @Service("ZZ")
  9. public class ZZDeviceDataAnalyseManager extends AbstractDeviceDataAnalyseManager {

  10. @Override
  11. public String getDataType() {
  12. return "总装";
  13. }

  14. }

五、最后看一下Springboot启动类实现;该类既是启动类也是Controller类,没什么特别要说明的。

  1. package com.hugesoft;

  2. import com.hugesoft.service.IStatusAnalyseService;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.SpringApplication;
  5. import org.springframework.boot.autoconfigure.SpringBootApplication;
  6. import org.springframework.scheduling.annotation.EnableAsync;
  7. import org.springframework.web.bind.annotation.GetMapping;
  8. import org.springframework.web.bind.annotation.RestController;

  9. import java.util.List;

  10. @RestController
  11. @EnableAsync
  12. @SpringBootApplication
  13. public class SpringbootAsyncApplication {

  14. @Autowired
  15. private List<IStatusAnalyseService> statusAnalyseServiceList;

  16. public static void main(String[] args) {
  17. SpringApplication.run(SpringbootAsyncApplication.class, args);
  18. }

  19. @GetMapping("/sayHelloAsync")
  20. public String sayHelloAsync() {
  21. for (IStatusAnalyseService statusAnalyseService : statusAnalyseServiceList) {
  22. // 采用自定义线程池
  23. statusAnalyseService.doStatusAnalyseHandle(null, null);
  24. // 采用默认线程池
  25. statusAnalyseService.doStatusAnalyseHandle(null);
  26. }
  27. return "Hello, Async!";
  28. }

  29. }

六、最后启动main方法,通过浏览器地址栏访问 http://localhost:8080/sayHelloAsync,发现秒出现如下页面,且控制台会出现如下内容,说明我们配置的默认线程池和自定义线程池都起作用了,到此,配置成功

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多