Dubbo的RPC调用为什么是同步的?
但是,由于 Dubbo 底层基于 Netty 发送网络请求,而Netty又是一个异步事件驱动的模型,所以理论上来说,请求应该是一个异步的过程。不会像上面所说的Consumer需要阻塞等待才对。
那么为什么我们使用Dubbo的时候,它的RPC调用会是同步的呢?
其实很容易想到,既然底层是异步的,可是我们使用框架的时候,又变成了同步,那肯定是框架给我们做了一些工作,使得我们的请求由异步转成了同步。
那么Dubbo做了什么,能使得异步请求变成同步等待的呢?
在回答这个问题前,小伙伴们可以自己先思考一下,如果让我们自己来完成异步转同步,我们该怎么做呢?
大体的步骤如下:
当 RPC 返回结果之前,阻塞调用线程,让调用线程等待;
当 RPC 返回结果后,唤醒调用线程,让调用线程重新执行。
厉害的小伙伴应该会想到,这整个过程不就是可以通过Java的等待 - 通知机制来实现吗?
有了自己的方案之后,我们再来看看 Dubbo 是怎么实现的。当然这就要从 Dubbo 的相关源码进行分析(下文源码版本为Dubbo 2.6.*,高版本有所更改,不过大致想法一致)。
假如有如下Dubbo Service方法调用
String name = userService.findByName("强哥叨逼叨");
通过Debug打印调用栈,我们发现,在消费端发请求出去时,会走到DubboInvoker的doInvoker方法:
//DubboInvokerprotected Result doInvoke(final Invocation invocation) throws Throwable {//...if (isOneway) {//2.异步没返回值boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);currentClient.send(inv, isSent);RpcContext.getContext().setFuture(null);return new RpcResult();} else if (isAsync) {//1.异步有返回值--发送者ResponseFuture future = currentClient.request(inv, timeout);FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);RpcContext.getContext().setFuture(futureAdapter);Result result;if (isAsyncFuture) {result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);} else {result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);}return result;} else {//3.异步变同步RpcContext.getContext().setFuture(null);return (Result) currentClient.request(inv, timeout)//返回下面的future.get();//进入get()方法,是当前线程阻塞。那么当有结果返回时,唤醒这个线程}}public ResponseFuture request(Object request, int timeout) throws RemotingException {Request req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setData(request);//在发送的那一刻,当前线程是得到future这个返回值DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);try {channel.send(req);//通过netty发送出去} catch (RemotingException e) {future.cancel();throw e;}return future;}
我们看到3处,异步转同步过程的代码,先调用了 request(inv, timeout) 方法返回DefaultFuture,这个方法其实就是发送 RPC 请求,之后通过调用 get() 方法等待 RPC 返回结果。
DefaultFuture 这个类是很关键,我把相关的代码精简之后,列到了下面:
// 创建锁与条件变量private final Lock lock = new ReentrantLock();private final Condition done = lock.newCondition();// 调用方通过该方法等待结果Object get(int timeout){long start = System.nanoTime();lock.lock();try {while (!isDone()) {done.await(timeout);long cur=System.nanoTime();if (isDone() || cur-start > timeout){break;}}} finally {lock.unlock();}if (!isDone()) {throw new TimeoutException();}return returnFromResponse();}// RPC 结果是否已经返回boolean isDone() {return response != null;}// RPC 结果返回时调用该方法private void doReceived(Response res) {lock.lock();try {response = res;if (done != null) {done.signal();}} finally {lock.unlock();}}
调用线程通过调用 get() 方法等待 RPC 返回结果,这个方法里面,你看到的都是Java并发包中的类和方法:调用 lock() 获取锁,在 finally 里面调用 unlock() 释放锁;获取锁后,通过经典的在循环中调用 await() 方法来实现等待。
当 RPC 结果返回时,会调用 doReceived() 方法,这个方法里面,调用 lock() 获取锁,在finally 里面调用 unlock() 释放锁,获取锁后通过调用 signal() 来通知调用线程,结果已经返回,不用继续等待了。
至此,Dubbo 里面的异步转同步的源码就分析完了,有没有觉得还挺简单的?
而有了上面的经验,以后我们在遇到类似的异步转同步的问题时,仿照上面的方法,使用Lock和Condition配合来实现自己的功能。
课程:极客时间,王宝令的《Java并发编程实战》
https://time.geekbang.org/column/intro/159
博客:
https://juejin.cn/post/6844903840169345032
