vlambda博客
学习文章列表

为什么实际开发中多线程编程很重要?




淘宝,大家都用过吧,点击我的淘宝,页面如下。



淘宝这种大型C端互联网产品,用户规模几个亿,所有功能模块都是基于微服务开发的。以我在阿里淘特技术部实习开发的经验看,就目前看到的这个页面,至少会直接或者间接调用几十个(可能更多)基于HSF(阿里的RPC框架)的微服务接口,背后至少有几百号人的技术团队的支持。


这个页面的呈现,需要在几个亿的用户数据中,把包括商品信息,卡卷消息,用户现金权益等各种信息进行数据的组织召回,且不说微服务的内部调用链路很长且耗时复杂,单单从数据库层面,在各个分库分表中,在以亿为级别的数据中找到找到一条对应用户的数据,已经相当耗时了。


因此,实际开发中,面对上述业务业务场景。在架构层面,缓存,数据库的分库分表都是常规操作,走HTTP也是不够的,直接走RPC。在代码层面,一个重要的方面就是使用多线程编程。


如下模拟演示,使用多线程编程,如何在复杂的数据召回场景中,大幅度提升响应效率。


一、UserServcie接口

没啥讲的,很常规


 1**
2 * 用户服务
3 *
4 * @author kobe24
5 * @date 2022/04/04
6 */
7
8public interface UserService {
9    /**
10     * 通过用户id获取粉丝数
11     *
12     * @param userId 用户id
13     * @return long
14     */

15    long countFansCountByUserId(Long userId);
16
17    /**
18     * 通过用户id获取消息数量
19     *
20     * @param userId 用户id
21     * @return long
22     */

23    long countMsgCountByUserId(Long userId);
24
25    /**
26     * 通过用户获取分组数
27     *
28     * @param userId 用户id
29     * @return long
30     */

31    long countCollectCountByUserId(Long userId);
32
33    /**
34     * 通过用户id获取点赞总数
35     *
36     * @param userId 用户id
37     * @return long
38     */

39    long countFollowCountByUserId(Long userId);
40
41
42    /**
43     * 通过用户id获取红包总数
44     *
45     * @param userId 用户id
46     * @return long
47     */

48    long countRedBonusCountByUserId(Long userId);
49
50    /**
51     * 通过用户id获取优惠价数
52     *
53     * @param userId 用户id
54     * @return long
55     */

56    long countCouponCountByUserId(Long userId);
57
58}



二、UserServiceImpl


这里面使用线程休眠,模拟各个方法的执行耗时。单线程串行执行,总共耗时:10+10+10+10+8+4=52s


 1/**
2 * @program: spring-boot-demo-plus
3 * @author: David Qin
4 * @create: 2022-04-04 23:47
5 * <p>
6 * 以下方法通过线程池休眠模拟数据的召回所需的时间
7 */

8@Slf4j
9@Service
10public class UserServiceImpl implements UserService {
11    /**
12     * 通过用户id获取粉丝数
13     *
14     * @param userId 用户id
15     * @return long
16     */

17    @Override
18    public long countFansCountByUserId(Long userId) {
19        log.info("UserService获取FansCount的线程: {} ", Thread.currentThread().getName());
20        try {
21            Thread.sleep(10000);
22            log.info("获取FansCount===睡眠:10s");
23        } catch (InterruptedException e) {
24            e.printStackTrace();
25        }
26        System.out.println();
27        return 520;
28    }
29
30    /**
31     * 通过用户id获取消息数量
32     *
33     * @param userId 用户id
34     * @return long
35     */

36    @Override
37    public long countMsgCountByUserId(Long userId) {
38        log.info("UserService获取MsgCount的线程: {} ", Thread.currentThread().getName());
39        try {
40            Thread.sleep(10000);
41            log.info("获取MsgCount===睡眠:10s");
42        } catch (InterruptedException e) {
43            e.printStackTrace();
44        }
45        return 618;
46    }
47
48    /**
49     * 通过用户获取分组数
50     *
51     * @param userId 用户id
52     * @return long
53     */

54    @Override
55    public long countCollectCountByUserId(Long userId) {
56        log.info("UserService获取CollectCount的线程:{}", Thread.currentThread().getName());
57        try {
58            Thread.sleep(10000);
59            log.info("获取CollectCount==睡眠:10s");
60        } catch (InterruptedException e) {
61            e.printStackTrace();
62        }
63        return 6664;
64    }
65
66    /**
67     * 通过用户id获取点赞总数
68     *
69     * @param userId 用户id
70     * @return long
71     */

72    @Override
73    public long countFollowCountByUserId(Long userId) {
74        log.info("UserService获取FollowCount的线程:{}", Thread.currentThread().getName());
75        try {
76            Thread.sleep(10000);
77            log.info("获取FollowCount===睡眠:10s");
78        } catch (InterruptedException e) {
79            e.printStackTrace();
80        }
81        return 7788;
82    }
83
84    /**
85     * 通过用户id获取红包总数
86     *
87     * @param userId 用户id
88     * @return long
89     */

90    @Override
91    public long countRedBonusCountByUserId(Long userId) {
92        log.info("UserService获取RedBagCount的线程: {} ", Thread.currentThread().getName());
93        try {
94            TimeUnit.SECONDS.sleep(4);
95            log.info("获取RedBagCount===睡眠:4s");
96        } catch (InterruptedException e) {
97            e.printStackTrace();
98        }
99        return 99;
100    }
101
102    /**
103     * 通过用户id获取优惠价数
104     *
105     * @param userId 用户id
106     * @return long
107     */

108    @Override
109    public long countCouponCountByUserId(Long userId) {
110        log.info("UserService获取CouponCount的线程:{}  ", Thread.currentThread().getName());
111        try {
112            TimeUnit.SECONDS.sleep(8);
113            log.info("获取CouponCount===睡眠:8s");
114        } catch (InterruptedException e) {
115            e.printStackTrace();
116        }
117        return 66;
118    }
119}


三、基于线程池的FutureTask


线程池核心参数:核心线程数8 最大线程数20 保活时间30s 阻塞队列长度10 拒绝策略为交给提交任务的线程执行。


ps:jdk默认提供了四种拒绝策略:

1、CallerRunsPolicy - 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。谁提交,负责。

2、AbortPolicy - 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。

3、DiscardPolicy - 直接丢弃,其他啥都没有

4、DiscardOldestPolicy - 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入


 1/**
2 * @program: spring-boot-demo-plus
3 * @author: David Qin
4 * @create: 2022-04-04 23:54
5 * 演示使用基于线程池的FutureTask 执行多线程任务
6 */

7@Component
8@Slf4j
9public class BestPracticeFutureTask {
10
11    @Autowired
12    private UserService userService;
13
14    /**
15     * 核心线程数:8 最大线程数:20 保活时间:30秒 阻塞队列长度 :10 拒绝策略:交给提交任务的线程执行
16     */

17    private static ExecutorService executorService = new ThreadPoolExecutor(8,
18        2030L, TimeUnit.SECONDS,
19        new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
20
21    /**
22     * 批量数据检索用户信息
23     *
24     *
25     * @param userId          用户id
26     * @param closeFutureTask 是否关闭多线程执行
27     * @return {@link UserBehaviorsDto}
28     */

29    public UserBehaviorsDto batchDataRetrievingUserInfo(final Long userId,Boolean closeFutureTask) {
30        log.info("最佳实践:FutureTask执行任务的线程名:{}", Thread.currentThread().getName());
31        long fansCount = 0, msgCount = 0, collectCount = 0,
32            followCount = 0, redBagCount = 0, couponCount = 0;
33
34        // 模拟不开启多线程召回的时间
35        if(closeFutureTask){
36         fansCount = userService.countFansCountByUserId(userId);
37         msgCount = userService.countMsgCountByUserId(userId);
38         collectCount = userService.countCollectCountByUserId(userId);
39         followCount = userService.countFollowCountByUserId(userId);
40         redBagCount = userService.countRedBonusCountByUserId(userId);
41         couponCount = userService.countCouponCountByUserId(userId);
42
43            UserBehaviorsDto userBehaviorsDto = UserBehaviorsDto.builder()
44                .collectCount(collectCount)
45                .couponCount(couponCount)
46                .msgCount(msgCount)
47                .fansCount(fansCount)
48                .followCount(followCount)
49                .redBonusCount(redBagCount)
50                .note("没有开启多线程的召回")
51                .build();
52
53            return Optional.ofNullable(userBehaviorsDto).orElse(UserBehaviorsDto.builder().note("默认模板").build());
54        }
55
56
57        try {
58            Future<Long> fansCountFT = executorService.submit(() -> userService.countFansCountByUserId(userId));
59            Future<Long> msgCountFT = executorService.submit(() -> userService.countMsgCountByUserId(userId));
60            Future<Long> followCountFT = executorService.submit(() -> userService.countFollowCountByUserId(userId));
61            Future<Long> collectCountFT = executorService.submit(() -> userService.countCollectCountByUserId(userId));
62            Future<Long> redBonusCountFT = executorService.submit(() -> userService.countRedBonusCountByUserId(userId));
63            Future<Long> couponCountFT = executorService.submit(() -> userService.countCouponCountByUserId(userId));
64
65            // 阻塞获取结果
66            fansCount = fansCountFT.get();
67            msgCount = msgCountFT.get();
68            followCount = followCountFT.get();
69            collectCount = collectCountFT.get();
70            redBagCount = redBonusCountFT.get();
71            collectCount = collectCountFT.get();
72            couponCount = couponCountFT.get();
73
74        } catch (InterruptedException | ExecutionException e) {
75            log.error("用户页数据召回失败:com.xkcoding.demofuturetask.task.BestPracticeFutureTask ->{}", e.getMessage());
76        }
77        UserBehaviorsDto userBehaviorsDto = UserBehaviorsDto.builder()
78            .collectCount(collectCount)
79            .couponCount(couponCount)
80            .msgCount(msgCount)
81            .fansCount(fansCount)
82            .followCount(followCount)
83            .redBonusCount(redBagCount)
84            .note("正常召回")
85            .build();
86        return Optional.ofNullable(userBehaviorsDto).orElse(UserBehaviorsDto.builder().note("默认模板").build());
87    }
88
89}


四、UserController


统计在程序开启多线程召回,不开启多线程召回时的下的时间对比


 1/**
2 * @program: spring-boot-demo-plus
3 * @author: David Qin
4 * @create: 2022-04-05 10:43
5 */

6@RestController
7@RequestMapping("user")
8@Slf4j
9public class UserController {
10    @Autowired
11    private BestPracticeFutureTask futureTask;
12
13
14    @GetMapping(path = "userBehaviors/{userId}/{close}")
15    public UserBehaviorsDto getUserBehaviorDto(@PathVariable Long userId,@PathVariable Boolean close){
16        log.info("UserController的工作线程:{}",Thread.currentThread().getName());
17        Long begin=System.currentTimeMillis();
18        UserBehaviorsDto userBehaviorsDto=futureTask.batchDataRetrievingUserInfo(userId,close);
19        Long end=System.currentTimeMillis();
20        if(close){
21            log.info("未开启多线程数据召回总耗时:{} s",(end-begin)/1000);
22        }
23        else {
24            log.info("开启多线程数据召回总耗时:{} s",(end-begin)/1000);
25        }
26        return userBehaviorsDto
29}



五、测试


使用IDEA自带的Rest Client测试:


为什么实际开发中多线程编程很重要?

开启多线程召回日志显示,接口响应时间10s。这个10s其实就是UserServcie接口中,所有被调用方法这的耗费时间最长的一个方法的所用时间。线程池中的Future<T>的get()方法会阻塞当工作线程,直到获取任务的返回结果。


为什么实际开发中多线程编程很重要?


不开启多线程召回,串行处理,接口响应时间为所有方法的叠加:10+10+10+10+8+4=52s。


为什么实际开发中多线程编程很重要?



END


实际项目中,线程池+FutureTask是实现多线程编程的常用组合。但FutureTask的局限性在于:


1、当多个线程任务之间存在结果依赖时,只能通过While轮询的调用isDone()方法去判断。


2、另外,FutureTask也无法对多个任务进行复合处理(也可以,但是不方便)或者方便的通过链式调用对结果进行进一步处理。


以上的这些问问题,都可以通过CompletableFuture进行解决。下期再分享。

为什么实际开发中多线程编程很重要?

往  期  精  选



"微  信  关  注 、 查  看  往  期  文  章 "



"加  我  聊  聊 "