vlambda博客
学习文章列表

今天来认识一个异步编程类CompletableFuture


之前项目中经常需要使用到异步编程,今天我们就来看看Java8中的CompletableFuture是如何进行使用的。使用这种异步编程的好处就是在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成。主线程可以并行的执行其他任务,可以极大的提高程序的性能。



CompletableFuture首先是一个Future,它拥有Future所有的功能,包括获取异步执行结果,取消正在执行的任务等。除此之外,CompletableFuture还是一个CompletionStage。那么什么是CompletionStage呢?在异步程序中,如果将每次的异步执行都看成是一个stage的话,我们通常很难控制异步程序的执行顺序,在javascript中,我们需要在回调中执行回调。这就会形成传说中的回调地狱。好在在ES6中引入了promise的概念,可以将回调中的回调转写为链式调用,从而大大的提升了程序的可读性和可写性。同样的在java中,我们使用CompletionStage来实现异步调用的链式操作。CompletionStage定义了一系列的then*** 操作来实现这一功能。并且提供了许多关于创建,链式调用和组合多个 Future 的便利方法集,而且有广泛的异常处理支持。





CompletableFuture创建


public static CompletableFuture<Void>   runAsync(Runnable runnable)public static CompletableFuture<Void>   runAsync(Runnable runnable, Executor executor)public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)

Asynsc表示异步,而supplyAsync与runAsync不同在于前者异步返回一个结果,后者是void。第二个函数第二个参数表示用我们自己创建的线程池,否则采用默认的ForkJoinPool.commonPool()作为它的线程池。其中Supplier是一个函数式接口,代表是一个生成者的意思,返回一个结果。


01

CompletableFuture



runAsync()


如果你想异步的运行一个后台任务并且不想改任务返回任务东西,这时候可以使用 CompletableFuture.runAsync()方法,它持有一个 Runable对象,并返回 CompletableFuture<Void>。

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {    try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } System.out.println("Right!");});


02

CompletableFuture



supplyAsync()


通过CompletableFuture.supplyAsync()可以返回一些结果,它持有supplier<T> 并且返回CompletableFuture<T>,T 是通过调用 传入的supplier取得的值的类型。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Right!";});

如果我们不传入第二个Executor 参数,那么CompletableFuture可以从全局的 ForkJoinPool.commonPool()获得一个线程中执行这些任务。但是我们也可以创建一个线程池并传给runAsync() 和supplyAsync()方法来让他们从线程池中获取一个线程执行它们的任务。这里通过编写一个配置类来实现线程的获取,后续通过注入即可。

@EnableConfigurationProperties(ThreadPoolConfigProperties.class)@Configurationpublic class MyThreadConfig { @Bean public ThreadPoolExecutor executor(ThreadPoolConfigProperties pool) { return new ThreadPoolExecutor( pool.getCoreSize(), pool.getMaxSize(), pool.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingDeque<>(100000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() );    }}
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Right!";}, executor);


03

CompletableFuture



thenApply()


我们还可以使用 thenApply() 处理和改变CompletableFuture的结果。它持有一个Function<R,T>作为参数。Function<R,T>是一个简单的函数式接口,接受一个T类型的参数,产出一个R类型的结果。还可以通过附加一系列的thenApply()在回调方法 在CompletableFuture写一个连续的转换。这样的话,结果中的一个 thenApply方法就会传递给该系列的另外一个 thenApply方法。

CompletableFuture<String> welcomeText = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); }    return "hjh";}).thenApply(name -> { return "Hello " + name;}).thenApply(greeting -> { return greeting + ", Welcome";});System.out.println(welcomeText.get());


04

CompletableFuture



thenAccept()


如果我们不想从回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片段,我们可以使用thenAccept() 和 thenRun()方法,这些方法经常在调用链的最末端的最后一个回调函数中使用。CompletableFuture.thenAccept() 持有一个Consumer<T> ,返回一个CompletableFuture<Void>。它可以访问CompletableFuture的结果。

CompletableFuture.supplyAsync(() -> { return ProductService.getProductDetail(productId);}).thenAccept(product -> { System.out.println("Right " + product.getName())});

05

CompletableFuture



thenRun()


虽然thenAccept()可以访问CompletableFuture的结果,但thenRun()不能访Future的结果,它持有一个Runnable返回CompletableFuture<Void>。

CompletableFuture.supplyAsync(() -> { // Run some computation }).thenRun(() -> { // Computation Finished.});

06

CompletableFuture



thenCompose()


假设你想从一个远程API中获取一个用户的详细信息,一旦用户信息可用,你想从另外一个服务中获取他的订单信息。这时候就可以使用 thenCompose() 组合两个独立的future。

CompletableFuture<User> getUsersDetail(String userId) { return CompletableFuture.supplyAsync(() -> { UserService.getUserDetails(userId); }); }CompletableFuture<Double> getCreditRating(User user) { return CompletableFuture.supplyAsync(() -> { CreditRatingService.getOrderInfo(user); });}


07

CompletableFuture



thenCombine()


thenCombine()被用来当两个独立的Future都完成的时候,用来做一些事情。

CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return 65.0;});CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return 177.8;});
CompletableFuture<Double> combinedFuture = weightInKgFuture .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> { Double heightInMeter = heightInCm/100; return weightInKg/(heightInMeter*heightInMeter);});
System.out.println("Your BMI is - " + combinedFuture.get());


08

CompletableFuture



allOf()


我们使用thenCompose() 和 thenCombine()把两个CompletableFuture组合在一起。现在如果你想组合任意数量的CompletableFuture,应该怎么做?我们可以使用以下两个方法组合任意数量的CompletableFuture。

static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

CompletableFuture.allOf的使用场景是当你一个列表的独立future,并且你想在它们都完成后并行的做一些事情。

List<String> webPageLinks = Arrays.asList(...)    // A list of 100 web page linksList<CompletableFuture<String>> pageContentFutures = webPageLinks.stream() .map(webPageLink -> downloadWebPage(webPageLink))        .collect(Collectors.toList());CompletableFuture<Void> allFutures = CompletableFuture.allOf( pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()]));

使用CompletableFuture.allOf()的问题是它返回CompletableFuture<Void>。但是我们可以通过写一些额外的代码来获取所有封装的CompletableFuture结果。

CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> { return pageContentFutures.stream() .map(pageContentFuture -> pageContentFuture.join()) .collect(Collectors.toList());})


09

CompletableFuture



anyOf()

CompletableFuture.anyOf()和其名字介绍的一样,当任何一个CompletableFuture完成的时候【相同的结果类型】,返回一个新的CompletableFuture。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { throw new IllegalStateException(e); }    return "Result 1";});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result 2";});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new IllegalStateException(e); }    return "Result 3";});
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);


10

CompletableFuture



handle()


API提供了一个更通用的方法 - handle()从异常恢复,无论一个异常是否发生它都会被调用。

Integer age = -1;CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> { if(age < 0) { throw new IllegalArgumentException("Age can not be negative"); } if(age > 18) { return "Adult"; } else { return "Child"; }}).handle((res, ex) -> { if(ex != null) { System.out.println("exception - " + ex.getMessage()); return "Unknown!"; } return res;});System.out.println("Maturity : " + maturityFuture.get());

有了CompletableFuture之后,我们自己实现异步编程变得轻松很多,这个类也提供了许多方法来组合CompletableFuture。结合Lambada表达式来用,变得很轻松.。