Dubbo的RPC调用为什么是同步的?
但是,由于 Dubbo 底层基于 Netty 发送网络请求,而Netty又是一个异步事件驱动的模型,所以理论上来说,请求应该是一个异步的过程。不会像上面所说的Consumer需要阻塞等待才对。
那么为什么我们使用Dubbo的时候,它的RPC调用会是同步的呢?
其实很容易想到,既然底层是异步的,可是我们使用框架的时候,又变成了同步,那肯定是框架给我们做了一些工作,使得我们的请求由异步转成了同步。
那么Dubbo做了什么,能使得异步请求变成同步等待的呢?
在回答这个问题前,小伙伴们可以自己先思考一下,如果让我们自己来完成异步转同步,我们该怎么做呢?
大体的步骤如下:
当 RPC 返回结果之前,阻塞调用线程,让调用线程等待;
当 RPC 返回结果后,唤醒调用线程,让调用线程重新执行。
厉害的小伙伴应该会想到,这整个过程不就是可以通过Java的等待 - 通知机制来实现吗?
有了自己的方案之后,我们再来看看 Dubbo 是怎么实现的。当然这就要从 Dubbo 的相关源码进行分析(下文源码版本为Dubbo 2.6.*,高版本有所更改,不过大致想法一致)。
假如有如下Dubbo Service方法调用
String name = userService.findByName("强哥叨逼叨");
通过Debug打印调用栈,我们发现,在消费端发请求出去时,会走到DubboInvoker的doInvoker方法:
//DubboInvoker
protected Result doInvoke(final Invocation invocation) throws Throwable {
//...
if (isOneway) {//2.异步没返回值
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {//1.异步有返回值--发送者
ResponseFuture future = currentClient.request(inv, timeout);
FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
RpcContext.getContext().setFuture(futureAdapter);
Result result;
if (isAsyncFuture) {
result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
} else {
result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
}
return result;
} else {//3.异步变同步
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout)//返回下面的future
.get();//进入get()方法,是当前线程阻塞。那么当有结果返回时,唤醒这个线程
}
}
public ResponseFuture request(Object request, int timeout) throws RemotingException {
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
//在发送的那一刻,当前线程是得到future这个返回值
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
channel.send(req);//通过netty发送出去
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
我们看到3处,异步转同步过程的代码,先调用了 request(inv, timeout) 方法返回DefaultFuture,这个方法其实就是发送 RPC 请求,之后通过调用 get() 方法等待 RPC 返回结果。
DefaultFuture 这个类是很关键,我把相关的代码精简之后,列到了下面:
// 创建锁与条件变量
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
// 调用方通过该方法等待结果
Object get(int timeout){
long start = System.nanoTime();
lock.lock();
try {
while (!isDone()) {
done.await(timeout);
long cur=System.nanoTime();
if (isDone() || cur-start > timeout){
break;
}
}
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException();
}
return returnFromResponse();
}
// RPC 结果是否已经返回
boolean isDone() {
return response != null;
}
// RPC 结果返回时调用该方法
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
}
调用线程通过调用 get() 方法等待 RPC 返回结果,这个方法里面,你看到的都是Java并发包中的类和方法:调用 lock() 获取锁,在 finally 里面调用 unlock() 释放锁;获取锁后,通过经典的在循环中调用 await() 方法来实现等待。
当 RPC 结果返回时,会调用 doReceived() 方法,这个方法里面,调用 lock() 获取锁,在finally 里面调用 unlock() 释放锁,获取锁后通过调用 signal() 来通知调用线程,结果已经返回,不用继续等待了。
至此,Dubbo 里面的异步转同步的源码就分析完了,有没有觉得还挺简单的?
而有了上面的经验,以后我们在遇到类似的异步转同步的问题时,仿照上面的方法,使用Lock和Condition配合来实现自己的功能。
课程:极客时间,王宝令的《Java并发编程实战》
https://time.geekbang.org/column/intro/159
博客:
https://juejin.cn/post/6844903840169345032