今天来认识一个异步编程类CompletableFuture
之前项目中经常需要使用到异步编程,今天我们就来看看Java8中的CompletableFuture是如何进行使用的。使用这种异步编程的好处就是在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成。主线程可以并行的执行其他任务,可以极大的提高程序的性能。
CompletableFuture首先是一个Future,它拥有Future所有的功能,包括获取异步执行结果,取消正在执行的任务等。除此之外,CompletableFuture还是一个CompletionStage。那么什么是CompletionStage呢?在异步程序中,如果将每次的异步执行都看成是一个stage的话,我们通常很难控制异步程序的执行顺序,在javascript中,我们需要在回调中执行回调。这就会形成传说中的回调地狱。好在在ES6中引入了promise的概念,可以将回调中的回调转写为链式调用,从而大大的提升了程序的可读性和可写性。同样的在java中,我们使用CompletionStage来实现异步调用的链式操作。CompletionStage定义了一系列的then*** 操作来实现这一功能。并且提供了许多关于创建,链式调用和组合多个 Future 的便利方法集,而且有广泛的异常处理支持。
CompletableFuture创建
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
Asynsc表示异步,而supplyAsync与runAsync不同在于前者异步返回一个结果,后者是void。第二个函数第二个参数表示用我们自己创建的线程池,否则采用默认的ForkJoinPool.commonPool()作为它的线程池。其中Supplier是一个函数式接口,代表是一个生成者的意思,返回一个结果。
01
CompletableFuture
runAsync()
如果你想异步的运行一个后台任务并且不想改任务返回任务东西,这时候可以使用 CompletableFuture.runAsync()方法,它持有一个 Runable对象,并返回 CompletableFuture<Void>。
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
System.out.println("Right!");
});
02
CompletableFuture
supplyAsync()
通过CompletableFuture.supplyAsync()可以返回一些结果,它持有supplier<T> 并且返回CompletableFuture<T>,T 是通过调用 传入的supplier取得的值的类型。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Right!";
});
如果我们不传入第二个Executor 参数,那么CompletableFuture可以从全局的 ForkJoinPool.commonPool()获得一个线程中执行这些任务。但是我们也可以创建一个线程池并传给runAsync() 和supplyAsync()方法来让他们从线程池中获取一个线程执行它们的任务。这里通过编写一个配置类来实现线程的获取,后续通过注入即可。
@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {
@Bean
public ThreadPoolExecutor executor(ThreadPoolConfigProperties pool) {
return new ThreadPoolExecutor(
pool.getCoreSize(),
pool.getMaxSize(),
pool.getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
}
}
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Right!";
}, executor);
03
CompletableFuture
thenApply()
我们还可以使用 thenApply() 处理和改变CompletableFuture的结果。它持有一个Function<R,T>作为参数。Function<R,T>是一个简单的函数式接口,接受一个T类型的参数,产出一个R类型的结果。还可以通过附加一系列的thenApply()在回调方法 在CompletableFuture写一个连续的转换。这样的话,结果中的一个 thenApply方法就会传递给该系列的另外一个 thenApply方法。
CompletableFuture<String> welcomeText = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "hjh";
}).thenApply(name -> {
return "Hello " + name;
}).thenApply(greeting -> {
return greeting + ", Welcome";
});
System.out.println(welcomeText.get());
04
CompletableFuture
thenAccept()
如果我们不想从回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片段,我们可以使用thenAccept() 和 thenRun()方法,这些方法经常在调用链的最末端的最后一个回调函数中使用。CompletableFuture.thenAccept() 持有一个Consumer<T> ,返回一个CompletableFuture<Void>。它可以访问CompletableFuture的结果。
CompletableFuture.supplyAsync(() -> {
return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
System.out.println("Right " + product.getName())
});
05
CompletableFuture
thenRun()
虽然thenAccept()可以访问CompletableFuture的结果,但thenRun()不能访Future的结果,它持有一个Runnable返回CompletableFuture<Void>。
CompletableFuture.supplyAsync(() -> {
// Run some computation
}).thenRun(() -> {
// Computation Finished.
});
06
CompletableFuture
thenCompose()
假设你想从一个远程API中获取一个用户的详细信息,一旦用户信息可用,你想从另外一个服务中获取他的订单信息。这时候就可以使用 thenCompose() 组合两个独立的future。
CompletableFuture<User> getUsersDetail(String userId) {
return CompletableFuture.supplyAsync(() -> {
UserService.getUserDetails(userId);
});
}
CompletableFuture<Double> getCreditRating(User user) {
return CompletableFuture.supplyAsync(() -> {
CreditRatingService.getOrderInfo(user);
});
}
07
CompletableFuture
thenCombine()
thenCombine()被用来当两个独立的Future都完成的时候,用来做一些事情。
CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return 65.0;
});
CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return 177.8;
});
CompletableFuture<Double> combinedFuture = weightInKgFuture
.thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> {
Double heightInMeter = heightInCm/100;
return weightInKg/(heightInMeter*heightInMeter);
});
System.out.println("Your BMI is - " + combinedFuture.get());
08
CompletableFuture
allOf()
我们使用thenCompose() 和 thenCombine()把两个CompletableFuture组合在一起。现在如果你想组合任意数量的CompletableFuture,应该怎么做?我们可以使用以下两个方法组合任意数量的CompletableFuture。
static CompletableFuture<Void> allOf(CompletableFuture >... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture >... cfs)
CompletableFuture.allOf的使用场景是当你一个列表的独立future,并且你想在它们都完成后并行的做一些事情。
List<String> webPageLinks = Arrays.asList(...) // A list of 100 web page links
List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
.map(webPageLink -> downloadWebPage(webPageLink))
.collect(Collectors.toList());
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()])
);
使用CompletableFuture.allOf()的问题是它返回CompletableFuture<Void>。但是我们可以通过写一些额外的代码来获取所有封装的CompletableFuture结果。
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
return pageContentFutures.stream()
.map(pageContentFuture -> pageContentFuture.join())
.collect(Collectors.toList());
})
09
CompletableFuture
anyOf()
CompletableFuture.anyOf()和其名字介绍的一样,当任何一个CompletableFuture完成的时候【相同的结果类型】,返回一个新的CompletableFuture。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result 1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result 2";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result 3";
});
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);
10
CompletableFuture
handle()
API提供了一个更通用的方法 - handle()从异常恢复,无论一个异常是否发生它都会被调用。
Integer age = -1;
CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
if(age < 0) {
throw new IllegalArgumentException("Age can not be negative");
}
if(age > 18) {
return "Adult";
} else {
return "Child";
}
}).handle((res, ex) -> {
if(ex != null) {
System.out.println("exception - " + ex.getMessage());
return "Unknown!";
}
return res;
});
System.out.println("Maturity : " + maturityFuture.get());
有了CompletableFuture之后,我们自己实现异步编程变得轻松很多,这个类也提供了许多方法来组合CompletableFuture。结合Lambada表达式来用,变得很轻松.。