函数式编程 - 异步处理(CompletableFuture)
一、示例
package main;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author cc
* @function
* @date 2021/8/2 17:18
*/
public class FunctionTest {
private static final Logger log = LoggerFactory.getLogger(FunctionTest.class);
public static void main(String[] args) throws Exception {
FunctionTest main = new FunctionTest();
System.out.println("\n------------------------------------ 函数式编程方式 ------------------------------------");
main.run();
System.out.println("\n------------------------------------ 经典的异步实现方式 ------------------------------------");
main.runOld();
}
long value = 101;
void run() throws Exception {
// 定义任务内容
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
value += 101;
log.info("supplyAsync, value=" + value);
return String.format("%d", value);
});
Thread.sleep(1000);
log.info("sleep done");
// 执行完任务后执行匿名函数
CompletableFuture<Void> next = future.thenAccept(v -> {
log.info("thenAccept, value=" + v);
});
// 获取任务的执行结果
String result = future.get();
log.info("thenAccept, get=" + result);
}
void runOld() throws Exception {
value = 101;
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 启动线程执行任务
task(queue);
// 阻塞方式等待任务执行结束
String result = queue.take();
log.info("thenAccept, get=" + result);
}
void task(LinkedBlockingQueue<String> queue) {
new Thread(() -> {
String result = "failed";
try {
value += 101;
log.info("supplyAsync, value=" + value);
result = String.format("%d", value);
} catch (Exception e) {
log.error("", e);
}
try {
queue.put(result);
} catch (InterruptedException e) {
log.error("", e);
}
}).start();
}
}
二、输出
函数式编程方式 ------------------------------------
08:47:51.571 [ForkJoinPool.commonPool-worker-1] INFO main.FunctionTest - supplyAsync, value=202
08:47:52.575 [main] INFO main.FunctionTest - sleep done
08:47:52.575 [main] INFO main.FunctionTest - thenAccept, value=202
08:47:52.575 [main] INFO main.FunctionTest - thenAccept, get=202
经典的异步实现方式 ------------------------------------
08:47:52.576 [Thread-1] INFO main.FunctionTest - supplyAsync, value=202
08:47:52.576 [main] INFO main.FunctionTest - thenAccept, get=202
Process finished with exit code 0
通过日志可以了解到,定义function的时候其实已经执行里面逻辑了,get和thenAccept只是为了等待function执行结束。
三、解释
解决的是执行完一个异步函数后,得到执行结果或者根据执行结果继续执行后续的处理逻辑。
之前的方法是把异步执行的任务放到线程中执行,然后另外一个线程监听通知队列或者阻塞等待任务执行完毕,任务执行完后往队列放一个通知,通知等待线程。
现在可以采用更为简洁的方式去处理这种场景:
定义一个异步函数
使异步函数执行
获取执行结果
当调用get、thenAccept等方法后,java本身会帮助我们执行异步逻辑,并以阻塞的方式等待结果。
一个函数对象只会执行一次,执行完之后会保存这个结果,下次调用get等计算方法的时候直接使用上次的计算结果。
四、查看代码
通过get方法查看具体的执行方式:
CompletableFuture.get()
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
如果函数之前执行过,则会把结果保存到result变量中,调用get方法执行返回result结果;
如果函数没有执行过,则调用waitingGet方法执行。
下面看waitingGet方法:
private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
while ((r = result) == null) {
if (spins < 0)
spins = SPINS;
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
else if (!queued)
queued = tryPushStack(q);
else if (interruptible && q.interruptControl < 0) {
q.thread = null;
cleanStack();
return null;
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible)
r = null; // report interruption
else
Thread.currentThread().interrupt();
}
}
postComplete();
return r;
}
此方法的目的是阻塞的方式检测定义的function是否执行完毕,执行完毕或者出现异常后返回结果。
五、总结
这种编程方式是一种思维的转变,让异步结构变得更清晰,前后关系也很清楚(都是A->B->C),可以把所有的异步代码放在一起写,不需要添加额外的异步执行结果判断变量。
但是需要一段时间去习惯这种写法,否则也会被其中各种匿名函数和api绕晕。