为什么实际开发中多线程编程很重要?
淘宝,大家都用过吧,点击我的淘宝,页面如下。
淘宝这种大型C端互联网产品,用户规模几个亿,所有功能模块都是基于微服务开发的。以我在阿里淘特技术部实习开发的经验看,就目前看到的这个页面,至少会直接或者间接调用几十个(可能更多)基于HSF(阿里的RPC框架)的微服务接口,背后至少有几百号人的技术团队的支持。
这个页面的呈现,需要在几个亿的用户数据中,把包括商品信息,卡卷消息,用户现金权益等各种信息进行数据的组织召回,且不说微服务的内部调用链路很长且耗时复杂,单单从数据库层面,在各个分库分表中,在以亿为级别的数据中找到找到一条对应用户的数据,已经相当耗时了。
因此,实际开发中,面对上述业务业务场景。在架构层面,缓存,数据库的分库分表都是常规操作,走HTTP也是不够的,直接走RPC。在代码层面,一个重要的方面就是使用多线程编程。
如下模拟演示,使用多线程编程,如何在复杂的数据召回场景中,大幅度提升响应效率。
一、UserServcie接口
没啥讲的,很常规
1**
2 * 用户服务
3 *
4 * @author kobe24
5 * @date 2022/04/04
6 */
7
8public interface UserService {
9 /**
10 * 通过用户id获取粉丝数
11 *
12 * @param userId 用户id
13 * @return long
14 */
15 long countFansCountByUserId(Long userId);
16
17 /**
18 * 通过用户id获取消息数量
19 *
20 * @param userId 用户id
21 * @return long
22 */
23 long countMsgCountByUserId(Long userId);
24
25 /**
26 * 通过用户获取分组数
27 *
28 * @param userId 用户id
29 * @return long
30 */
31 long countCollectCountByUserId(Long userId);
32
33 /**
34 * 通过用户id获取点赞总数
35 *
36 * @param userId 用户id
37 * @return long
38 */
39 long countFollowCountByUserId(Long userId);
40
41
42 /**
43 * 通过用户id获取红包总数
44 *
45 * @param userId 用户id
46 * @return long
47 */
48 long countRedBonusCountByUserId(Long userId);
49
50 /**
51 * 通过用户id获取优惠价数
52 *
53 * @param userId 用户id
54 * @return long
55 */
56 long countCouponCountByUserId(Long userId);
57
58}
二、UserServiceImpl
这里面使用线程休眠,模拟各个方法的执行耗时。单线程串行执行,总共耗时:10+10+10+10+8+4=52s
1/**
2 * @program: spring-boot-demo-plus
3 * @author: David Qin
4 * @create: 2022-04-04 23:47
5 * <p>
6 * 以下方法通过线程池休眠模拟数据的召回所需的时间
7 */
8@Slf4j
9@Service
10public class UserServiceImpl implements UserService {
11 /**
12 * 通过用户id获取粉丝数
13 *
14 * @param userId 用户id
15 * @return long
16 */
17 @Override
18 public long countFansCountByUserId(Long userId) {
19 log.info("UserService获取FansCount的线程: {} ", Thread.currentThread().getName());
20 try {
21 Thread.sleep(10000);
22 log.info("获取FansCount===睡眠:10s");
23 } catch (InterruptedException e) {
24 e.printStackTrace();
25 }
26 System.out.println();
27 return 520;
28 }
29
30 /**
31 * 通过用户id获取消息数量
32 *
33 * @param userId 用户id
34 * @return long
35 */
36 @Override
37 public long countMsgCountByUserId(Long userId) {
38 log.info("UserService获取MsgCount的线程: {} ", Thread.currentThread().getName());
39 try {
40 Thread.sleep(10000);
41 log.info("获取MsgCount===睡眠:10s");
42 } catch (InterruptedException e) {
43 e.printStackTrace();
44 }
45 return 618;
46 }
47
48 /**
49 * 通过用户获取分组数
50 *
51 * @param userId 用户id
52 * @return long
53 */
54 @Override
55 public long countCollectCountByUserId(Long userId) {
56 log.info("UserService获取CollectCount的线程:{}", Thread.currentThread().getName());
57 try {
58 Thread.sleep(10000);
59 log.info("获取CollectCount==睡眠:10s");
60 } catch (InterruptedException e) {
61 e.printStackTrace();
62 }
63 return 6664;
64 }
65
66 /**
67 * 通过用户id获取点赞总数
68 *
69 * @param userId 用户id
70 * @return long
71 */
72 @Override
73 public long countFollowCountByUserId(Long userId) {
74 log.info("UserService获取FollowCount的线程:{}", Thread.currentThread().getName());
75 try {
76 Thread.sleep(10000);
77 log.info("获取FollowCount===睡眠:10s");
78 } catch (InterruptedException e) {
79 e.printStackTrace();
80 }
81 return 7788;
82 }
83
84 /**
85 * 通过用户id获取红包总数
86 *
87 * @param userId 用户id
88 * @return long
89 */
90 @Override
91 public long countRedBonusCountByUserId(Long userId) {
92 log.info("UserService获取RedBagCount的线程: {} ", Thread.currentThread().getName());
93 try {
94 TimeUnit.SECONDS.sleep(4);
95 log.info("获取RedBagCount===睡眠:4s");
96 } catch (InterruptedException e) {
97 e.printStackTrace();
98 }
99 return 99;
100 }
101
102 /**
103 * 通过用户id获取优惠价数
104 *
105 * @param userId 用户id
106 * @return long
107 */
108 @Override
109 public long countCouponCountByUserId(Long userId) {
110 log.info("UserService获取CouponCount的线程:{} ", Thread.currentThread().getName());
111 try {
112 TimeUnit.SECONDS.sleep(8);
113 log.info("获取CouponCount===睡眠:8s");
114 } catch (InterruptedException e) {
115 e.printStackTrace();
116 }
117 return 66;
118 }
119}
三、基于线程池的FutureTask
线程池核心参数:核心线程数8 最大线程数20 保活时间30s 阻塞队列长度10 拒绝策略为交给提交任务的线程执行。
ps:jdk默认提供了四种拒绝策略:
1、CallerRunsPolicy - 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。谁提交,负责。
2、AbortPolicy - 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
3、DiscardPolicy - 直接丢弃,其他啥都没有
4、DiscardOldestPolicy - 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入
1/**
2 * @program: spring-boot-demo-plus
3 * @author: David Qin
4 * @create: 2022-04-04 23:54
5 * 演示使用基于线程池的FutureTask 执行多线程任务
6 */
7@Component
8@Slf4j
9public class BestPracticeFutureTask {
10
11 @Autowired
12 private UserService userService;
13
14 /**
15 * 核心线程数:8 最大线程数:20 保活时间:30秒 阻塞队列长度 :10 拒绝策略:交给提交任务的线程执行
16 */
17 private static ExecutorService executorService = new ThreadPoolExecutor(8,
18 20, 30L, TimeUnit.SECONDS,
19 new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
20
21 /**
22 * 批量数据检索用户信息
23 *
24 *
25 * @param userId 用户id
26 * @param closeFutureTask 是否关闭多线程执行
27 * @return {@link UserBehaviorsDto}
28 */
29 public UserBehaviorsDto batchDataRetrievingUserInfo(final Long userId,Boolean closeFutureTask) {
30 log.info("最佳实践:FutureTask执行任务的线程名:{}", Thread.currentThread().getName());
31 long fansCount = 0, msgCount = 0, collectCount = 0,
32 followCount = 0, redBagCount = 0, couponCount = 0;
33
34 // 模拟不开启多线程召回的时间
35 if(closeFutureTask){
36 fansCount = userService.countFansCountByUserId(userId);
37 msgCount = userService.countMsgCountByUserId(userId);
38 collectCount = userService.countCollectCountByUserId(userId);
39 followCount = userService.countFollowCountByUserId(userId);
40 redBagCount = userService.countRedBonusCountByUserId(userId);
41 couponCount = userService.countCouponCountByUserId(userId);
42
43 UserBehaviorsDto userBehaviorsDto = UserBehaviorsDto.builder()
44 .collectCount(collectCount)
45 .couponCount(couponCount)
46 .msgCount(msgCount)
47 .fansCount(fansCount)
48 .followCount(followCount)
49 .redBonusCount(redBagCount)
50 .note("没有开启多线程的召回")
51 .build();
52
53 return Optional.ofNullable(userBehaviorsDto).orElse(UserBehaviorsDto.builder().note("默认模板").build());
54 }
55
56
57 try {
58 Future<Long> fansCountFT = executorService.submit(() -> userService.countFansCountByUserId(userId));
59 Future<Long> msgCountFT = executorService.submit(() -> userService.countMsgCountByUserId(userId));
60 Future<Long> followCountFT = executorService.submit(() -> userService.countFollowCountByUserId(userId));
61 Future<Long> collectCountFT = executorService.submit(() -> userService.countCollectCountByUserId(userId));
62 Future<Long> redBonusCountFT = executorService.submit(() -> userService.countRedBonusCountByUserId(userId));
63 Future<Long> couponCountFT = executorService.submit(() -> userService.countCouponCountByUserId(userId));
64
65 // 阻塞获取结果
66 fansCount = fansCountFT.get();
67 msgCount = msgCountFT.get();
68 followCount = followCountFT.get();
69 collectCount = collectCountFT.get();
70 redBagCount = redBonusCountFT.get();
71 collectCount = collectCountFT.get();
72 couponCount = couponCountFT.get();
73
74 } catch (InterruptedException | ExecutionException e) {
75 log.error("用户页数据召回失败:com.xkcoding.demofuturetask.task.BestPracticeFutureTask ->{}", e.getMessage());
76 }
77 UserBehaviorsDto userBehaviorsDto = UserBehaviorsDto.builder()
78 .collectCount(collectCount)
79 .couponCount(couponCount)
80 .msgCount(msgCount)
81 .fansCount(fansCount)
82 .followCount(followCount)
83 .redBonusCount(redBagCount)
84 .note("正常召回")
85 .build();
86 return Optional.ofNullable(userBehaviorsDto).orElse(UserBehaviorsDto.builder().note("默认模板").build());
87 }
88
89}
四、UserController
统计在程序开启多线程召回,不开启多线程召回时的下的时间对比
1/**
2 * @program: spring-boot-demo-plus
3 * @author: David Qin
4 * @create: 2022-04-05 10:43
5 */
6@RestController
7@RequestMapping("user")
8@Slf4j
9public class UserController {
10 @Autowired
11 private BestPracticeFutureTask futureTask;
12
13
14 @GetMapping(path = "userBehaviors/{userId}/{close}")
15 public UserBehaviorsDto getUserBehaviorDto(@PathVariable Long userId,@PathVariable Boolean close){
16 log.info("UserController的工作线程:{}",Thread.currentThread().getName());
17 Long begin=System.currentTimeMillis();
18 UserBehaviorsDto userBehaviorsDto=futureTask.batchDataRetrievingUserInfo(userId,close);
19 Long end=System.currentTimeMillis();
20 if(close){
21 log.info("未开启多线程数据召回总耗时:{} s",(end-begin)/1000);
22 }
23 else {
24 log.info("开启多线程数据召回总耗时:{} s",(end-begin)/1000);
25 }
26 return userBehaviorsDto
29}
五、测试
使用IDEA自带的Rest Client测试:
开启多线程召回日志显示,接口响应时间10s。这个10s其实就是UserServcie接口中,所有被调用方法这的耗费时间最长的一个方法的所用时间。线程池中的Future<T>的get()方法会阻塞当工作线程,直到获取任务的返回结果。
不开启多线程召回,串行处理,接口响应时间为所有方法的叠加:10+10+10+10+8+4=52s。
END
实际项目中,线程池+FutureTask是实现多线程编程的常用组合。但FutureTask的局限性在于:
1、当多个线程任务之间存在结果依赖时,只能通过While轮询的调用isDone()方法去判断。
2、另外,FutureTask也无法对多个任务进行复合处理(也可以,但是不方便)或者方便的通过链式调用对结果进行进一步处理。
以上的这些问问题,都可以通过CompletableFuture进行解决。下期再分享。
往 期 精 选:
"微 信 关 注 、 查 看 往 期 文 章 "
"加 我 聊 聊 "