vlambda博客
学习文章列表

Java8异步利器CompletableFuture的骚操作

码农在囧途

那天,我漫步在城墙下,戴着耳机,感受这春天带来的迷人气息,我看着这人来人往的人群,四处充满人间烟火,不过,也处处充满悲欢离合,我看着古老的城墙,经历了无数风雨的洗刷,战争的摧残,依然显得那么坚韧,路过城墙下的路人是一次美好的历史感受,心灵体验,而对于城墙来说,路人不过只是踏起了薄薄的尘埃,有人来过,也有人去过,时间记不住他们之间的联系,历史也也无法重演那一刻,一切都在静悄悄的发生,静悄悄的消失,我们对于,城墙,对于历史,那也是尘埃的记忆!

前言

这篇关于CompletableFuture的文章在前一个月就写了一部分,后面没有时间去写,今天周末,所以就抽时间把它写完,因为CompletableFuture中的函数确实很多,也没必要一个一个的去写完,只是抽出大致的函数来说,因为CompletableFuture很像ES6中的Promise()函数,所以我们在学习的时候可以带着Promise()的思想去学习,异步编程不但能够提升我们的相应速度,也能使我们的代码更加简洁,但是我们是在用异步编程的时候也要充分考虑业务和方法是否合适异步操作,不然将会带来一些问题。

ES6回调函数

用过ES6语法的同学都知道,ES6提供了Promise()函数实现异步调用,它提供了then(),可以实现回调,可以写多个,catch()是发生异常调用的,finally()是最后调用的,无论正常执行还是异常,都会执行finally()CompletableFuture也是这样,所以如果有ES6回调的基础,理解CompletableFuture就简单。


call(params) {
return new Promise((resolve) => {
resolve(params)
}).then(res => {
console.log("task1 " + JSON.stringify(res))
return res;
}).then(res => {
console.log("task2 " + JSON.stringify(res))
return res;
}).catch(err => {
console.log("err " + err)
}).finally(() => {
console.log("finally~~~ ")
})
}

call(123);


输出


task1 123
task2 123
finally~~~

CompletableFuture创建异步任务的方法

CompletableFuture可以直接操作如下方法,supplyAsync创建异步任务,它有两个方法,一个带线程池参数Executor,一个不带,不带的默认使用线程池,带的就用自己创建的线程池,supplyAsync创建的异步任务有返回值,runAsync同样也有两个方法,一个带线程池参数Executor,一个不带,runAsync创建的异步任务无返回值,completedFuture返回一个给定值的CompletableFuture,anyOf包含多个CompletableFuture任务,只要任何一个任务完成,anyOf就返回一个新的CompletableFuture,allOf也是包含多个CompletableFuture任务,不过需要全部任务完成allOf才会返回一个新的CompletableFuture,anyOf和allOf里面还有一些内容,下面会详细说。

supplyAsync创建有返回值的异步任务

创建有返回值的异步任务有两个方法,supplyAsync(Supplier supplier)和supplyAsync(Supplier supplier, Executor executor),从参数我们可以看出,一个参数有线程池,一个没有线程池,如果不传递线程池参数,CompletableFuture会使用它默认的ForkJoinPool.commonPool()线程池,如果传递线程池参与,那么就使用我们定义的线程池,使用supplyAsync创建有任务有返回值,可通过get()方法获取异步执行结果。


public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}


supplyAsync不传递线程池参数,则使用默认的ForkJoinPool.commonPool()。


private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}


示例

通过示例来演示使用默认线程池和不使用默认线程池,

使用自己创建的线程池

//使用自己创建的线程池
public class supplyAsyncUseExecutors {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newCachedThreadPool();
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开始执行有返回值的异步任务");
return "我是小四哥";
}, threadPool);
System.out.println(future.get());
threadPool.shutdown();
}
}


输出


开始执行有返回值的异步任务
我是小四哥


使用默认线程池

//使用默认的线程池
public class supplyAsyncNoUseExecutors {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开始执行有返回值的异步任务");
return "我是小四哥";
});
System.out.println(future.get());
}
}


输出


开始执行有返回值的异步任务
我是小四哥

runAsync创建无返回值的异步任务

创建无返回值的异步任务有两个方法,分别带线程池参数和不带线程池参数,和supplyAsync一样,就不赘述,可知runAsync传入Runnable接口,Runnable接口是无返回值的。


public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}

public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}


示例


public class runAsyncTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("我是小四哥");
});
System.out.println(future.get());
}
}


输出


我是小四哥
null


从输出看出使用CompletableFuture的get()获取到的值为null

completedFuture返回已完成的任务

completedFuture返回的是已经完成的任务,其实就是返回值,不用运算,从源码可知值是一个泛型参数。


public static <U> CompletableFuture<U> completedFuture(U value) {
return new CompletableFuture<U>((value == null) ? NIL : value);
}


示例


/**
* @author 刘牌
* @date 2022/3/2716:47
*/

public class CompleteFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<R> future = CompletableFuture.completedFuture(getUserInfo());
System.out.println(future.get());
}
public static R getUserInfo(){
return new R(200,"获取用户信息成功",new User("小四哥","123456","13657746155"));
}
}


输出


R(code=200, msg=获取用户信息成功, data=User(username=小四哥, pwd=123456, phone=13657746155))

anyOf包含多个异步任务的方法

anyOf包含了多个CompletableFuture异步任务,只要有其中一个任务完成就返回,其他任务没完成不管,不过如果使用get()获取异步返回的结果,如果异步任务发生异常,那么就会抛出异常,如果不使用get()获取异步结果,那么异步异步任务有异常也不会抛出。


public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}


可以看出anyOf的参数是可变参数,可以接受多个CompletableFuture异步任务。

示例

1.只要有一个任务完成,就返回,下面的例子模拟异步任务超时,包含三个异步任务,task1task2task3,其中task1task2休眠3stask3正常。

/**
* @author 刘牌
* @date 2022/3/2713:31
*/

public class AnyOfTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("开始执行有返回值的异步任务 task1");
return "task1";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("开始执行有返回值的异步任务 task2");
return "task2";
});
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
System.out.println("开始执行有返回值的异步任务 task3");
return "task3";
});
CompletableFuture<Object> future = CompletableFuture.anyOf(task1, task2, task3);
System.out.println("future "+apply.get());
}
}


输出


开始执行有返回值的异步任务 task3
future task3


从输出结果看,task1task2并没有输出,这是因为task3执行完以后anyOf就直接返回了,异步任务就结束了,所以task1task2没有执行完成。

2.任意一个异步任务发生异常,使用get()会抛出异常,我们在task3中制造异常。

/**
* @author 刘牌
* @date 2022/3/2713:31
*/

public class AnyOfTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
System.out.println("开始执行有返回值的异步任务 task1");
return "task1";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("开始执行有返回值的异步任务 task2");
return "task2";
});
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
System.out.println("开始执行有返回值的异步任务 task3");
int i = 1/0;
return "task3";
});
CompletableFuture<Object> future = CompletableFuture.anyOf(task1, task2, task3).thenApplyAsync(s -> {
String res = "";
try {
res = task1.get() + task2.get() + task3.get();
}catch (Exception e){
e.printStackTrace();
}
return res;
});
System.out.println("future "+future.get());
}
}


输出


开始执行有返回值的异步任务 task1
开始执行有返回值的异步任务 task2
开始执行有返回值的异步任务 task3
future
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at com.steak.concurrent.CompletableFuture.api.anyOf.AnyOfTest.lambda$main$3(AnyOfTest.java:29)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: java.lang.ArithmeticException: / by zero
at com.steak.concurrent.CompletableFuture.api.anyOf.AnyOfTest.lambda$main$2(AnyOfTest.java:23)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
... 4 more


从输出可以使用future.get()获取异步结果抛出了异常。

如果不使用future.get()获取异步结果,即使异步任务中发生异常,也不会抛出

allOf包含多个异步任务的方法

allOf从字面意思可以看出是所有,表示只有所有的异步任务都完成了,allOf才会返回CompletableFuture,只要一种一个没有完成或者出现异常,都不会返回CompletableFuture。


public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}


示例

1.所有异步任务都完成才返回,我们造task1处休眠了2s

/**
* @author 刘牌
* @date 2022/3/2714:11
*/

public class AllOfTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("开始执行有返回值的异步任务 task1");
return "task1";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("开始执行有返回值的异步任务 task2");
return "task2";
});
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
System.out.println("开始执行有返回值的异步任务 task3");
return "task3";
});
CompletableFuture<String> future = CompletableFuture.allOf(task1, task2, task3)
.thenApplyAsync(s -> {
String res = "";
try {
res = task1.get() + task2.get() + task3.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return res;
});
System.out.println("future " + future.get());
}
}


输出


开始执行有返回值的异步任务 task2
开始执行有返回值的异步任务 task3
开始执行有返回值的异步任务 task1
future task1task2task3


从输出可以看出任务完成

2.异步任务发生异常,我们在task3处制造异常

/**
* @author 刘牌
* @date 2022/3/2714:11
*/

public class AllOfTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
System.out.println("开始执行有返回值的异步任务 task1");
return "task1";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("开始执行有返回值的异步任务 task2");
return "task2";
});
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
System.out.println("开始执行有返回值的异步任务 task3");
int i = 1 / 0;
return "task3";
});
CompletableFuture<String> future = CompletableFuture.allOf(task1, task2, task3)
.thenApplyAsync(s -> {
String res = "";
try {
res = task1.get() + task2.get() + task3.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return res;
});
System.out.println("future " + future.get());
}
}


输出


开始执行有返回值的异步任务 task1
开始执行有返回值的异步任务 task2
开始执行有返回值的异步任务 task3
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at com.steak.concurrent.CompletableFuture.api.allOf.AllOfTest.main(AllOfTest.java:36)
Caused by: java.lang.ArithmeticException: / by zero
at com.steak.concurrent.CompletableFuture.api.allOf.AllOfTest.lambda$main$2(AllOfTest.java:23)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)


从输出可以看出使用future.get()获取异步结果失败。

回调函数

当我们使用CompletableFuture创建了异步任务后,异步任务成功或者失败后,我们通常需要回调函数,就和ES的回调函数其实是一样的,回调函数能够很好的帮我们解决一些问题,不用我们再去编写代码控制。

thenApplyAsync和thenRunAsync

thenApplyAsync是有返回值的异步回调函数,它的参数是一个Function函数式接口,且它会将异步任务的执行结果作为参数传递到自己这里来,然后做相应的运算并返回,thenApplyAsync回调函数可以有很多个,可以一直往下回调,只是我们一般没那种必要去回调那么多次,thenRunAsync是无返回值的异步回调函数,它的参数是一个Runnable接口,而Runnable是无返回值的。


public class CallTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//有返回值的异步回调函数
CompletableFuture<Integer> applyResult = CompletableFuture.supplyAsync(() -> 100).thenApplyAsync(s -> s);
System.out.println(applyResult.get());
//无返回值的异步回调半数
CompletableFuture<Void> runResult = CompletableFuture.supplyAsync(() -> 100).thenRunAsync(() -> {
System.out.println("我是无返回值的异步回调函数");
});
System.out.println(runResult.get());
}
}


输出


100  
我是无返回值的异步回调函数
null

类似于thenApplyAsync之类的回调函数还有好多个,就不一一阐述。

thenCombineAsync和thenAcceptBothAsync

执行两个异步任务,两个任务都执行成功才返回,如果其中一个任务没有执行完成,另外一个就会等待,直到另外一个完成才返回,thenCombineAsync有返回值,thenAcceptBothAsync无返回值。

如下在task任务中休眠3s,那么thenCombineAsync会在3s后才完成

public class And {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});

CompletableFuture<String> combine = CompletableFuture.supplyAsync(() -> 100)
.thenCombineAsync(task, (s, th) -> {
try {
System.out.println("task " + task.get());
System.out.println(s);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return "thenCombine";
});
System.out.println(combine.get());
}
}


同理thenAcceptBothAsync也是在3s后才执行完毕,但是他没有返回值


public class And {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});

CompletableFuture<Void> bothAsync = CompletableFuture.supplyAsync(() -> 100)
.thenAcceptBothAsync(task, (s, th) -> {
try {
System.out.println(task.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
System.out.println("bothAsync " + bothAsync.get());
}
}

applyToEitherAsync和acceptEitherAsync

执行两个异步任务,只要其中一个完成,就返回,不用等待另外一个是否完成,其中applyToEitherAsync有返回值,而acceptEitherAsync无返回值


public class Or {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completedFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});
//applyToEitherAsync有返回值
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100)
.applyToEitherAsync(completedFuture, s -> {
System.out.println("123");
return s;
});
System.out.println(future.get());

//acceptEitherAsync无返回值
CompletableFuture<Void> eitherAsync = CompletableFuture.supplyAsync(() -> 100).acceptEitherAsync(completedFuture, System.out::println);
}
}

whenCompleteAsync

whenCompleteAsync回调函数会在任务执行完成后回调,无返回值,它很像异常处理中的finally,无论成功与否,都会经过finally块,如下我们执行异步任务,在其中一个任务中制造异常,whenCompleteAsync依然执行了。


public class Final {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completedFuture = CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
return 100;
});

CompletableFuture<Integer> completeAsync = CompletableFuture.supplyAsync(() -> 100)
.thenCombineAsync(completedFuture, (s, th) -> {
System.out.println("start ");
return s;
}).whenCompleteAsync((s, th) -> {
System.out.println("finally do it " + s);
});
System.out.println(completeAsync.get());
}


输出


finally do it null
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at com.steak.concurrent.CompletableFuture.api.call.Final.main(Final.java:26)
Caused by: java.lang.ArithmeticException: / by zero
at com.steak.concurrent.CompletableFuture.api.call.Final.lambda$main$0(Final.java:15)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)


关于更多的回调函数,大家可以去一探究竟,就不一一列举了,我们在使用CompletableFuture的时候其实就是在写ES规范中的回调函数,只不过CompletableFuture提供了更加丰富的函数,以满足我们不同的场景使用,不过使用CompletableFuture也需要我们有一定的经验,不能乱使用。

我是小四,今天的分享就到这里,感谢你的观看,我们下期见,如果文中有描述不正确,或者错误的地方,希望得到你的指点。