函数式编程 - 异步处理(CompletableFuture)
一、示例
package main;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.CompletableFuture;import java.util.concurrent.LinkedBlockingQueue;/*** @author cc* @function* @date 2021/8/2 17:18*/public class FunctionTest {private static final Logger log = LoggerFactory.getLogger(FunctionTest.class);public static void main(String[] args) throws Exception {FunctionTest main = new FunctionTest();System.out.println("\n------------------------------------ 函数式编程方式 ------------------------------------");main.run();System.out.println("\n------------------------------------ 经典的异步实现方式 ------------------------------------");main.runOld();}long value = 101;void run() throws Exception {// 定义任务内容CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {value += 101;log.info("supplyAsync, value=" + value);return String.format("%d", value);});Thread.sleep(1000);log.info("sleep done");// 执行完任务后执行匿名函数CompletableFuture<Void> next = future.thenAccept(v -> {log.info("thenAccept, value=" + v);});// 获取任务的执行结果String result = future.get();log.info("thenAccept, get=" + result);}void runOld() throws Exception {value = 101;LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();// 启动线程执行任务task(queue);// 阻塞方式等待任务执行结束String result = queue.take();log.info("thenAccept, get=" + result);}void task(LinkedBlockingQueue<String> queue) {new Thread(() -> {String result = "failed";try {value += 101;log.info("supplyAsync, value=" + value);result = String.format("%d", value);} catch (Exception e) {log.error("", e);}try {queue.put(result);} catch (InterruptedException e) {log.error("", e);}}).start();}}
二、输出
函数式编程方式 ------------------------------------08:47:51.571 [ForkJoinPool.commonPool-worker-1] INFO main.FunctionTest - supplyAsync, value=20208:47:52.575 [main] INFO main.FunctionTest - sleep done08:47:52.575 [main] INFO main.FunctionTest - thenAccept, value=20208:47:52.575 [main] INFO main.FunctionTest - thenAccept, get=202经典的异步实现方式 ------------------------------------08:47:52.576 [Thread-1] INFO main.FunctionTest - supplyAsync, value=20208:47:52.576 [main] INFO main.FunctionTest - thenAccept, get=202Process finished with exit code 0
通过日志可以了解到,定义function的时候其实已经执行里面逻辑了,get和thenAccept只是为了等待function执行结束。
三、解释
解决的是执行完一个异步函数后,得到执行结果或者根据执行结果继续执行后续的处理逻辑。
之前的方法是把异步执行的任务放到线程中执行,然后另外一个线程监听通知队列或者阻塞等待任务执行完毕,任务执行完后往队列放一个通知,通知等待线程。
现在可以采用更为简洁的方式去处理这种场景:
定义一个异步函数
使异步函数执行
获取执行结果
当调用get、thenAccept等方法后,java本身会帮助我们执行异步逻辑,并以阻塞的方式等待结果。
一个函数对象只会执行一次,执行完之后会保存这个结果,下次调用get等计算方法的时候直接使用上次的计算结果。
四、查看代码
通过get方法查看具体的执行方式:
CompletableFuture.get()
public T get() throws InterruptedException, ExecutionException {Object r;return reportGet((r = result) == null ? waitingGet(true) : r);}
如果函数之前执行过,则会把结果保存到result变量中,调用get方法执行返回result结果;
如果函数没有执行过,则调用waitingGet方法执行。
下面看waitingGet方法:
private Object waitingGet(boolean interruptible) {Signaller q = null;boolean queued = false;int spins = -1;Object r;while ((r = result) == null) {if (spins < 0)spins = SPINS;else if (spins > 0) {if (ThreadLocalRandom.nextSecondarySeed() >= 0)--spins;}else if (q == null)q = new Signaller(interruptible, 0L, 0L);else if (!queued)queued = tryPushStack(q);else if (interruptible && q.interruptControl < 0) {q.thread = null;cleanStack();return null;}else if (q.thread != null && result == null) {try {ForkJoinPool.managedBlock(q);} catch (InterruptedException ie) {q.interruptControl = -1;}}}if (q != null) {q.thread = null;if (q.interruptControl < 0) {if (interruptible)r = null; // report interruptionelseThread.currentThread().interrupt();}}postComplete();return r;}
此方法的目的是阻塞的方式检测定义的function是否执行完毕,执行完毕或者出现异常后返回结果。
五、总结
这种编程方式是一种思维的转变,让异步结构变得更清晰,前后关系也很清楚(都是A->B->C),可以把所有的异步代码放在一起写,不需要添加额外的异步执行结果判断变量。
但是需要一段时间去习惯这种写法,否则也会被其中各种匿名函数和api绕晕。
