vlambda博客
学习文章列表

Dubbo异步调用源码解读和Bug分析

As dubbo is based on a non-blocking NIO network layer, the client can start parallel call to multiple remote services without explicitly starting mulithreads, which costs relatively fewer resources.



一、Dubbo调用类型

Dubbo调用类型分为同步调用、异步调用和是否返回结果配置

  1. 默认为同步调用,并且有返回结果。

  1. 异步调用配置,设置 async="true",异步调用可以提高效率。

  1. 默认是有返回结果,设置return="false"则不需要返回,可以减少等待结果时间。


我们来看调用部分核心代码com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
public class DubboInvoker<T> extends AbstractInvoker<T> {
private final ExchangeClient[] clients;
private final AtomicPositiveInteger index = new AtomicPositiveInteger();
private final String version; private final ReentrantLock destroyLock = new ReentrantLock(); private final Set<Invoker<?>> invokers; public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients){ this(serviceType, url, clients, null); } public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers){ super(serviceType, url, new String[] {Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY}); this.clients = clients; // get version. this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0"); this.invokers = invokers; }
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version);
// 获取连接的复杂均衡算法 ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { // 简易轮询算法 currentClient = clients[index.getAndIncrement() % clients.length]; } try { // 异步配置 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // 是否需要返回结果 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); // 超时timeout int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { // 不管是否异步,只要不需要返回结果,直接异步调用,设置结果为null boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); // 注意:这里设置Fature为null RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { // 异步且需要返回结果,调用后设置结果future ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { // 同步且需要返回结果,调用后在此等待,直到有结果设置结果,或者超时抛出异常。 RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } @Override public boolean isAvailable() { if (!super.isAvailable()) return false; for (ExchangeClient client : clients){ if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)){ //cannot write == not Available ? return true ; } } return false; }
public void destroy() { //防止client被关闭多次.在connect per jvm的情况下,client.close方法会调用计数器-1,当计数器小于等于0的情况下,才真正关闭 if (super.isDestroyed()){ return ; } else { //dubbo check ,避免多次关闭 destroyLock.lock(); try{ if (super.isDestroyed()){ return ; } super.destroy(); if (invokers != null){ invokers.remove(this); } for (ExchangeClient client : clients) { try { client.close(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }finally { destroyLock.unlock(); } } }}
注意:通过以上源码阅读。我们知道了Dubbo底层调用虽然使用Netty异步实现的,但Dubbo请求默认却是同步调用需要返回结果的。这是通过获取ResponseFuture,然后使用ReentrantLock的await使当前线程等待返回结果(即 默认请求有个异步转同步的过程 )。

二、异步调用演示
熟悉了以上代码后,我们来实际验证下。我们用上一期》中的测试代码:
<?xml version="1.0" encoding="UTF-8" standalone="no"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">

<dubbo:application name="demo-provider"/>
<dubbo:registry protocol="zookeeper" address="127.0.0.1:2181"/>
<dubbo:protocol name="dubbo" port="20880"/>
<!-- 和本地bean一样实现服务 --> <bean id="helloWorldServiceImpl" class="cn.gov.zcy.dubbotest.api.HelloWorldServiceImpl"/>
<!-- 声明需要暴露的服务接口 --> <dubbo:service interface="cn.gov.zcy.dubbotest.api.HelloWorldSerivice" ref="helloWorldServiceImpl"/>
<dubbo:reference id="helloWorldService" interface="cn.gov.zcy.dubbotest.api.HelloWorldSerivice"/>     <!-- 这里必须使用scope=remote,避免走InjvmProtocol,原因是如果调用本地服务,异步调用就会失效,因为已经没有意义了 --> <dubbo:reference id="asyncHelloWorldService" interface="cn.gov.zcy.dubbotest.api.HelloWorldSerivice" async="true" scope="remote" > <dubbo:method name="hello2" return="true" /> </dubbo:reference>
</beans>
上文中 asyncHelloWorldService#hello2为异步调用不需要返回值。
public class DubboTest { public static void main(String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("application.xml");
HelloWorldSerivice asyncHelloWorldService = context.getBean("asyncHelloWorldService", HelloWorldSerivice.class); Response<String> asyncHeturnMessage = asyncHelloWorldService.hello2(null); System.out.println(asyncHeturnMessage);
Future<Response<String>> future = RpcContext.getContext().getFuture(); System.out.println(future.get().getResult());
LockSupport.park(); } }

通过前文源码的阅读,我们猜测这里应该返回空RpcResult,并且不返回Future。

Dubbo异步调用源码解读和Bug分析

可以看到debug到的结果符合预期。



三、Dubbo异步调用bug

Dubbo异步调用设计初衷是很好的,但是实际微服务场景下,使用async配置异步调用远程服务可能出现问题,原因是异步标识会进行传递,以下链接是一个官方issue:https://github.com/apache/dubbo/issues/789

Dubbo异步调用源码解读和Bug分析

我们用个图来表示下这个过程如下:

Dubbo异步调用源码解读和Bug分析

其实,上例中我们仅希望serviceB为异步调用,如果serviceB又依赖serviceC,那么这里应该按serviceC的配置进行调用,但实际情况确实serviceA调用serviceB的异步标识被传递给下层了,从而导致serviceB异常。


要解决这个问题并不难,可以有以下解决方案:

  1. 如不需要返回值,可采用oneway的方式(在消费者端配置dubbo:method中指定return="false")。

  2. 如确实需要返回值,可以在实际业务代码中使用线程池执行逻辑。

  3. 使用Provider端的Filter,清除attachment中的async标志。


我们来看下DubboX提供的解决方案,它采用的就是第三种解决方案,因为这种方案相对更优雅:

Dubbo异步调用源码解读和Bug分析


当然了,据说以上bug已经在dubbo 2.6.x版本进行了修复(读者如有兴趣可以自行验证)。


最后说明一下,本文以dubbo2.5.3版本源码为例进行解读,其他版本可能存在细微差别。

另外,使用Idea进行本地debug时还要注意一个经典的tostring()导致ReferenceBean对象中的ref属性被过早实例化问题。解决该问题有两种方法:
  1. 可以通过Intellij Idea -> Prefrencence -> Build -> Debuger -> Data Views -> Java uncheck Enable 'toString' object view禁用tostring。

  2. 也可以升级下dubbo版本,官方说已经在dubbo 2.6.6+版本中修复了该问题