vlambda博客
学习文章列表

Dubbo的RPC调用为什么是同步的?

强哥叨逼叨
愿你走过半生,归来仍是少年
39篇原创内容
Official Account
01
提出问题



我们在使用 Dubbo 调用远程服务时,在不设置Service属性async=true时,默认情况下,使用的是一种阻塞式调用方式,即 Consumer 端代码发起请求后,一直阻塞等待,直到 Provider 端返回为止。


但是,由于 Dubbo 底层基于 Netty 发送网络请求,而Netty又是一个异步事件驱动的模型,所以理论上来说,请求应该是一个异步的过程。不会像上面所说的Consumer需要阻塞等待才对。


那么为什么我们使用Dubbo的时候,它的RPC调用会是同步的呢?


02
问题解析



其实很容易想到,既然底层是异步的,可是我们使用框架的时候,又变成了同步,那肯定是框架给我们做了一些工作,使得我们的请求由异步转成了同步。


那么Dubbo做了什么,能使得异步请求变成同步等待的呢?


在回答这个问题前,小伙伴们可以自己先思考一下,如果让我们自己来完成异步转同步,我们该怎么做呢?


大体的步骤如下:

当 RPC 返回结果之前,阻塞调用线程,让调用线程等待;

当 RPC 返回结果后,唤醒调用线程,让调用线程重新执行。


厉害的小伙伴应该会想到,这整个过程不就是可以通过Java的等待 - 通知机制来实现吗?


有了自己的方案之后,我们再来看看 Dubbo 是怎么实现的。当然这就要从 Dubbo 的相关源码进行分析(下文源码版本为Dubbo 2.6.*,高版本有所更改,不过大致想法一致)。


假如有如下Dubbo Service方法调用

String name = userService.findByName("强哥叨逼叨"); 

通过Debug打印调用栈,我们发现,在消费端发请求出去时,会走到DubboInvoker的doInvoker方法:

 //DubboInvokerprotected Result doInvoke(final Invocation invocation) throws Throwable { //... if (isOneway) {//2.异步没返回值 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) {//1.异步有返回值--发送者 ResponseFuture future = currentClient.request(inv, timeout); FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future); RpcContext.getContext().setFuture(futureAdapter); Result result; if (isAsyncFuture) { result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } else { result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } return result; } else {//3.异步变同步 RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout)//返回下面的future .get();//进入get()方法,是当前线程阻塞。那么当有结果返回时,唤醒这个线程 }}   public ResponseFuture request(Object request, int timeout) throws RemotingException { Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request);//在发送的那一刻,当前线程是得到future这个返回值 DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout); try { channel.send(req);//通过netty发送出去 } catch (RemotingException e) { future.cancel(); throw e;    } return future; }

我们看到3处,异步转同步过程的代码,先调用了 request(inv, timeout) 方法返回DefaultFuture,这个方法其实就是发送 RPC 请求,之后通过调用 get() 方法等待 RPC 返回结果。


DefaultFuture 这个类是很关键,我把相关的代码精简之后,列到了下面:

// 创建锁与条件变量private final Lock locknew ReentrantLock();private final Condition done = lock.newCondition();// 调用方通过该方法等待结果Object get(int timeout){ long start = System.nanoTime(); lock.lock(); try {   while (!isDone()) { done.await(timeout); long cur=System.nanoTime();     if (isDone() || cur-start > timeout){ break;     }   }finally { lock.unlock(); } if (!isDone()) { throw new TimeoutException();  }  return returnFromResponse();}// RPC 结果是否已经返回boolean isDone() { return response != null;}// RPC 结果返回时调用该方法private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { done.signal(); } } finally { lock.unlock(); }}

调用线程通过调用 get() 方法等待 RPC 返回结果,这个方法里面,你看到的都是Java并发包中的类和方法调用 lock() 获取锁,在 finally 里面调用 unlock() 释放锁;获取锁后,通过经典的在循环中调用 await() 方法来实现等待。


当 RPC 结果返回时,会调用 doReceived() 方法,这个方法里面,调用 lock() 获取锁,在finally 里面调用 unlock() 释放锁,获取锁后通过调用 signal() 来通知调用线程,结果已经返回,不用继续等待了。


至此,Dubbo 里面的异步转同步的源码就分析完了,有没有觉得还挺简单的?


而有了上面的经验,以后我们在遇到类似的异步转同步的问题时,仿照上面的方法,使用Lock和Condition配合来实现自己的功能。


03
知识参考



课程:极客时间,王宝令的《Java并发编程实战》

https://time.geekbang.org/column/intro/159

博客

https://juejin.cn/post/6844903840169345032


强哥叨逼叨
愿你走过半生,归来仍是少年
39篇原创内容
Official Account