vlambda博客
学习文章列表

函数式编程 - 异步处理(CompletableFuture)

一、示例

 
package main;
import org.slf4j.Logger;import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;import java.util.concurrent.LinkedBlockingQueue;
/** * @author cc * @function * @date 2021/8/2 17:18 */public class FunctionTest { private static final Logger log = LoggerFactory.getLogger(FunctionTest.class);
public static void main(String[] args) throws Exception { FunctionTest main = new FunctionTest(); System.out.println("\n------------------------------------ 函数式编程方式 ------------------------------------"); main.run(); System.out.println("\n------------------------------------ 经典的异步实现方式 ------------------------------------"); main.runOld(); }
long value = 101;
void run() throws Exception { // 定义任务内容 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { value += 101; log.info("supplyAsync, value=" + value); return String.format("%d", value); });
Thread.sleep(1000); log.info("sleep done");
// 执行完任务后执行匿名函数 CompletableFuture<Void> next = future.thenAccept(v -> { log.info("thenAccept, value=" + v); });
// 获取任务的执行结果 String result = future.get(); log.info("thenAccept, get=" + result); }
void runOld() throws Exception { value = 101; LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); // 启动线程执行任务 task(queue);
// 阻塞方式等待任务执行结束 String result = queue.take(); log.info("thenAccept, get=" + result); }
void task(LinkedBlockingQueue<String> queue) { new Thread(() -> { String result = "failed"; try { value += 101; log.info("supplyAsync, value=" + value); result = String.format("%d", value); } catch (Exception e) { log.error("", e); }
try { queue.put(result); } catch (InterruptedException e) { log.error("", e); } }).start(); }}

二、输出

------------------------------------ 函数式编程方式 ------------------------------------08:47:51.571 [ForkJoinPool.commonPool-worker-1] INFO main.FunctionTest - supplyAsync, value=20208:47:52.575 [main] INFO main.FunctionTest - sleep done08:47:52.575 [main] INFO main.FunctionTest - thenAccept, value=20208:47:52.575 [main] INFO main.FunctionTest - thenAccept, get=202
------------------------------------ 经典的异步实现方式 ------------------------------------08:47:52.576 [Thread-1] INFO main.FunctionTest - supplyAsync, value=20208:47:52.576 [main] INFO main.FunctionTest - thenAccept, get=202
Process finished with exit code 0
通过日志可以了解到,定义function的时候其实已经执行里面逻辑了,get和thenAccept只是为了等待function执行结束。


三、解释

解决的是执行完一个异步函数后,得到执行结果或者根据执行结果继续执行后续的处理逻辑。

之前的方法是把异步执行的任务放到线程中执行,然后另外一个线程监听通知队列或者阻塞等待任务执行完毕,任务执行完后往队列放一个通知,通知等待线程。


现在可以采用更为简洁的方式去处理这种场景:

  1. 定义一个异步函数

  2. 使异步函数执行

  3. 获取执行结果


当调用get、thenAccept等方法后,java本身会帮助我们执行异步逻辑,并以阻塞的方式等待结果。

一个函数对象只会执行一次,执行完之后会保存这个结果,下次调用get等计算方法的时候直接使用上次的计算结果。


四、查看代码

通过get方法查看具体的执行方式:

CompletableFuture.get()

public T get() throws InterruptedException, ExecutionException { Object r; return reportGet((r = result) == null ? waitingGet(true) : r);}

如果函数之前执行过,则会把结果保存到result变量中,调用get方法执行返回result结果;

如果函数没有执行过,则调用waitingGet方法执行。


下面看waitingGet方法:

 
private Object waitingGet(boolean interruptible) { Signaller q = null; boolean queued = false; int spins = -1; Object r; while ((r = result) == null) { if (spins < 0) spins = SPINS; else if (spins > 0) { if (ThreadLocalRandom.nextSecondarySeed() >= 0) --spins; } else if (q == null) q = new Signaller(interruptible, 0L, 0L); else if (!queued) queued = tryPushStack(q); else if (interruptible && q.interruptControl < 0) { q.thread = null; cleanStack(); return null; } else if (q.thread != null && result == null) { try { ForkJoinPool.managedBlock(q); } catch (InterruptedException ie) { q.interruptControl = -1; } } } if (q != null) { q.thread = null; if (q.interruptControl < 0) { if (interruptible) r = null; // report interruption else Thread.currentThread().interrupt(); } } postComplete(); return r;}

此方法的目的是阻塞的方式检测定义的function是否执行完毕,执行完毕或者出现异常后返回结果。


五、总结

这种编程方式是一种思维的转变,让异步结构变得更清晰,前后关系也很清楚(都是A->B->C),可以把所有的异步代码放在一起写,不需要添加额外的异步执行结果判断变量。


但是需要一段时间去习惯这种写法,否则也会被其中各种匿名函数和api绕晕。