多线程异步编程之Java8 CompletableFuture详解(保姆级教程)
背景
类结构
学习使用
创建异步任务
异步回调
联合处理
总结
参考
背景
java在多线程异步编程这一块一直缺少比较好用的api,所以像一些三方框架比如netty都有扩展封装自己的异步编程工具类等,直到java8新增了一个类CompletableFuture
,java异步编程的一些api才算相对完善起来
类结构
可以看到CompletableFuture
是实现了CompletionStage
接口和Future
接口 Future大家都非常熟悉,不作过多介绍,我们看看新增的CompletionStage
,CompletionStage
主要表示的是同步或异步任务计算的某一个阶段,CompletionStage
子任务所包装的就是几个函数式接口:Function
(有输入有输出)、Runnable
(无输入无输出)、Consumer
(有输入无输出)
CompletionStage
不是我们研究的重点,我们重点还是放在学习使用CompletableFuture
上,因为CompletableFuture
包含40多个方法,可能是jdk中最复杂的工具类了
学习使用
首先我们看看CompletableFuture
提供的一些方法可以看到是非常之多的,我们重点学习一些常用的方法来讲解
创建异步任务
首先CompletableFuture
提供了两个和线程池类似的API
supplyAsync
创建带有返回值的异步任务,类似方法ExecutorService
的 submit(Callable<T> task)
方法
runAsync
创建没有返回值的异步任务,类似ExecutorService
submit(Runnable task)
方法
使用方式如下:
@Test
public void test() throws Exception{
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
CompletableFuture.runAsync(() -> {
System.out.println("提交异步任务");
});
System.out.println("获取异步任务的结果:" + supplyAsync.get());
}
这里需要注意的是supplyAsync
和runAsync
方法都提供了一个重载方法,即多添加一个线程池参数,如果添加线程池,就用传入的线程池去执行任务,否则就用默认的线程池ForkJoinPool.commonPool()
,源码如下:如果是单核CPU则使用ThreadPerTaskExecutor
线程池。==在实际线上使用注意一定要传入我们自定义的线程池==
使用自定义的线程池去执行我们只需要简单的改动上面代码就可以了。如下
@Test
public void test() throws Exception{
ExecutorService executorService = Executors.newFixedThreadPool(5);
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}, executorService);
CompletableFuture.runAsync(() -> {
System.out.println("提交异步任务");
}, executorService);
System.out.println("获取异步任务的结果:" + supplyAsync.get());
}
异步回调
有时候我们经常需要会遇到这样的需求,在执行完上面的任务后,获取任务的结果继续执行下面的任务。CompletableFuture
也提供了如下两个方法
thenApply
表示获取上一个任务的执行结果作为新任务的执行参数,有返回值thenApply
也是有三个方法重载
// 后一个任务与前一个任务在同一线程执行
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
// 后一个任务与前一个任务在不同线程中执行
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(defaultExecutor(), fn);
}
//后一个任务使用自定义线程池执行
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
例子:
@Test
public void test() throws Exception{
ExecutorService executorService = Executors.newFixedThreadPool(5);
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println("当先线程:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}, executorService);
CompletableFuture<Integer> thenApply = supplyAsync.thenApply((result) -> {
System.out.println("当先线程:" + Thread.currentThread().getName());
System.out.println("获取到上个任务结果为:" + result);
return 2;
});
System.out.println(thenApply.get());
}
可以看到异步执行完后回调使用的线程是同一个线程。如果我们将thenApply
方法换成thenApplyAsync
会发现任务回调后的执行线程不是同一个线程
thenApply
、thenApplyAsync
都是有返回值的异步回调,所以自然后没有返回值的异步回调
thenAccept
接受上一个任务的结果作为参数,但是没有返回值
也是有三个重置方法,和上面类似
// 后一个任务与前一个任务在同一线程执行
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
// 后一个任务与前一个任务在不同线程中执行
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(defaultExecutor(), action);
}
//后一个任务使用自定义线程池执行
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
和上面类似就不作代码演示
thenRun
就是单纯的等待上个任务执行完成,不接受参数,也没有返回值
thenRunAsync
也是和thenRun
类似,不过回调任务是使用新线程去执行
thenRun
有三个重载方法,区别如下
// 后一个任务与前一个任务在同一个线程中执行
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
// 后一个任务与前一个任务在不同线程中执行
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(defaultExecutor(), action);
}
//后一个任务使用自定义线程池执行
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
例子:
@Test
public void test() throws Exception{
ExecutorService executorService = Executors.newFixedThreadPool(5);
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println("当先线程:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务执行完成");
return 1;
}, executorService);
CountDownLatch countDownLatch = new CountDownLatch(1);
supplyAsync.thenRun(() -> {
System.out.println("当先线程:" + Thread.currentThread().getName());
countDownLatch.countDown();
});
countDownLatch.await();
}
执行结果:
exceptionally
exceptionally
主要是异常回调,即发生异常后将异常作为参数传入,需要注意的不管有没有发生异常都是执行异常回调的方法
例子如下
@Test
public void test() throws Exception{
ExecutorService executorService = Executors.newFixedThreadPool(5);
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println("当先线程:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务执行完成");
throw new RuntimeException("发送一次了");
}, executorService);
CompletableFuture<Integer> exceptionally = supplyAsync.exceptionally((throwable) -> {
System.out.println("当先线程:" + Thread.currentThread().getName());
System.out.println("捕获到的异常:" + throwable.getMessage());
return 2;
});
System.out.println("异常回调的结果: " + exceptionally.get());
}
执行结果:
whenComplete
whenComplete
算是 exceptionally
和thenApply
的结合,即将任务执行的结果和异常作为回到方法的参数,如果没有发生异常则异常参数为null
@Test
public void test() throws Exception{
ExecutorService executorService = Executors.newFixedThreadPool(5);
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println("当先线程:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务执行完成");
throw new RuntimeException("发送一次了");
}, executorService);
CompletableFuture<Integer> whenComplete = supplyAsync.whenComplete((r, e) -> {
System.out.println("当先线程:" + Thread.currentThread().getName());
System.out.println("捕获到的异常:" + e.getMessage());
System.out.println("获取到的返回结果:" + r);
});
// 这里需要注意 如果supplyAsync执行异常,就返回捕捉到的异常
// 如果supplyAsync执行没有异常whenComplete.get就返回supplyAsync的执行结果
System.out.println("异常回调的结果: " + whenComplete.get());
}
执行结果:
handle
与whenComplete
一样,不同的是handle
返回的结果是异步回调返回的结果和异常,而不是像whenComplete
那样返回的是异步任务的结果和异常
例子:
@Test
public void test() throws Exception{
ExecutorService executorService = Executors.newFixedThreadPool(5);
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println("当先线程:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务执行完成");
throw new RuntimeException("发送一次了");
}, executorService);
CompletableFuture<Integer> handle = supplyAsync.handle((r, e) -> {
System.out.println("当先线程:" + Thread.currentThread().getName());
System.out.println("捕获到的异常:" + e.getMessage());
System.out.println("获取到的返回结果:" + r);
return 2;
});
System.out.println("异常回调的结果: " + handle.get());
}
执行结果:
联合处理
双CompletableFuture联合处理
thenCombine
将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值,将两个CompletableFuture组合起来,只有这两个都正常执行完了才会执行某个任务。这个就可以实现多线程协作,类似比较经典的泡茶喝水那个案例。thenCombine
也是有三个重载方法
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(defaultExecutor(), other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}
例子
@Test
public void test3() {
// 任务 1:洗水壶 -> 烧开水
CompletableFuture<Void> f1 =
CompletableFuture.runAsync(() -> {
System.out.println("T1: 洗水壶...");
sleep(1);
System.out.println("T1: 烧开水...");
sleep(15);
});
// 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶
CompletableFuture<String> f2 =
CompletableFuture.supplyAsync(() -> {
System.out.println("T2: 洗茶壶...");
sleep(1);
System.out.println("T2: 洗茶杯...");
sleep(2);
System.out.println("T2: 拿茶叶...");
sleep(1);
return " 龙井 ";
});
// 任务 3:任务 1 和任务 2 完成后执行:泡茶
CompletableFuture<String> f3 =
f1.thenCombine(f2, (__, tf) -> {
System.out.println("T1: 拿到茶叶:" + tf);
System.out.println("T1: 泡茶...");
return " 上茶:" + tf;
});
// 等待任务 3 执行结果
System.out.println(f3.join());
}
public void sleep(long sleep) {
try {
TimeUnit.SECONDS.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
执行结果:
thenAcceptBoth
和thenCombine
类似,就是没有返回结果
例子:
public void test() throws Exception{
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(()->{
System.out.println("线程:" + Thread.currentThread().getName() + "任务1开始执行");
sleep(1);
return 1;
});
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(()->{
System.out.println("线程:" + Thread.currentThread().getName() + "任务2开始执行");
sleep(1);
return 2;
});
CountDownLatch countDownLatch = new CountDownLatch(1);
cf1.thenAcceptBoth(cf2,(a,b)->{
System.out.println("线程:" + Thread.currentThread().getName() + "任务3开始执行");
sleep(1);
System.out.println("获取到的任务1和任务的结果为:" + a + "," + b);
countDownLatch.countDown();
});
countDownLatch.await();
}
runAfterBoth
runAfterBoth没有入参,也没有返回值,如果两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果,和 Future.get()方法类似
@Test
public void test5() throws Exception{
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(()->{
System.out.println("线程:" + Thread.currentThread().getName() + "任务1开始执行");
sleep(1);
return 1;
});
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(()->{
System.out.println("线程:" + Thread.currentThread().getName() + "任务2开始执行");
sleep(1);
return 2;
});
CountDownLatch countDownLatch = new CountDownLatch(1);
CompletableFuture<Void> runAfterBoth = cf1.runAfterBoth(cf2, () -> {
System.out.println("线程:" + Thread.currentThread().getName() + "任务3开始执行");
sleep(1);
countDownLatch.countDown();
});
countDownLatch.await();
}
applyToEither
也是将两个CompletableFuture组合起来,只要其中一个执行完了就会执行某个任务。也是将两个任务的结果作为参数,有返回值
@Test
public void test6() throws Exception{
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(()->{
System.out.println("线程:" + Thread.currentThread().getName() + "任务1开始执行");
sleep(2);
return 1;
});
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(()->{
System.out.println("线程:" + Thread.currentThread().getName() + "任务2开始执行");
sleep(1);
return 2;
});
CompletableFuture<Integer> applyToEither = cf1.applyToEither(cf2, (f1) -> {
System.out.println("线程:" + Thread.currentThread().getName() + "任务3开始执行");
sleep(1);
return f1;
});
System.out.println("applyToEither:" + applyToEither.get());
}
可以看到任务一还没执行完就返回了
acceptEither
和applyToEither
类似,不同的没有返回值,这里就不写例子了
runAfterEither
和applyToEither
类似,不过是没有入参,没有返回值
thenCompose
多CompletableFuture联合处理
allOf
allOf返回的CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。
例子:
@Test
public void test11() throws Exception {
// 创建异步执行任务:
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {
System.out.println("线程:" + Thread.currentThread().getName() + "任务1开始执行");
sleep(1);
return 1;
});
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println("线程:" + Thread.currentThread().getName() + "任务2开始执行");
sleep(1);
return 2;
});
CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(() -> {
System.out.println("线程:" + Thread.currentThread().getName() + "任务3开始执行");
sleep(1);
return 3;
});
//allof等待所有任务执行完成才执行cf4,如果有一个任务异常终止,则cf4.get时会抛出异常,都是正常执行,cf4.get返回null
CompletableFuture<Void> cf4 = CompletableFuture.allOf(cf, cf2, cf3).whenComplete((r, e) -> {
if (e != null) {
System.out.println("捕获异常:" + e.getMessage());
} else {
System.out.println("获取任务执行结果" + r);
}
});
System.out.println("cf4执行结果: " + cf4.get());
}
anyOf
anyOf返回的CompletableFuture是多个任务==只要有一个执行完==,就会执行。get
返回的是已经执行完成的任务的执行结果,如果该任务执行异常,则抛出异常
例子:
@Test
public void test11() throws Exception {
// 创建异步执行任务:
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {
System.out.println("线程:" + Thread.currentThread().getName() + "任务1开始执行");
sleep(2);
return 1;
});
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println("线程:" + Thread.currentThread().getName() + "任务2开始执行");
sleep(1);
return 2;
});
CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(() -> {
System.out.println("线程:" + Thread.currentThread().getName() + "任务3开始执行");
sleep(3);
return 3;
});
//allof等待所有任务执行完成才执行cf4,如果有一个任务异常终止,则cf4.get时会抛出异常,都是正常执行,cf4.get返回null
CompletableFuture<Object> cf4 = CompletableFuture.anyOf(cf, cf2, cf3).whenComplete((r, e) -> {
if (e != null) {
System.out.println("捕获异常:" + e.getMessage());
} else {
System.out.println("获取任务执行结果" + r);
}
});
System.out.println("cf4执行结果: " + cf4.get());
}
执行结果:
总结
thenApply()
、thenRun()
、thenAccept()
这三个方法的不同之处主要在于其核心参数fn、action、consumer的类型不同,分别为Function<T,R>
、Runnable
、Consumer<? super T>
类型
但是,thenCompose()
方法与thenApply()
方法有本质的不同:
-
thenCompose()
的返回值是一个新的CompletionStage实例,可以持续用来进行下一轮CompletionStage任务的调度。具体来说,thenCompose()返回的是包装了普通异步方法的CompletionStage任务实例,通过该实例还可以进行下一轮CompletionStage任务的调度和执行,比如可以持续进行CompletionStage链式(或者流式)调用 -
thenApply()
的返回值则简单多了,直接就是第二个任务的普通异步方法的执行结果,它的返回类型与第二步执行的普通异步方法的返回类型相同,通过thenApply()
所返回的值不能进行下一轮CompletionStage链式(或者流式)调用
参考
博客Java高并发核心编程(卷二)