Dubbo异步调用源码解读和Bug分析
As dubbo is based on a non-blocking NIO network layer, the client can start parallel call to multiple remote services without explicitly starting mulithreads, which costs relatively fewer resources.
一、Dubbo调用类型
Dubbo调用类型分为同步调用、异步调用和是否返回结果配置
默认为同步调用,并且有返回结果。
异步调用配置,设置 async="true",异步调用可以提高效率。
默认是有返回结果,设置return="false"则不需要返回,可以减少等待结果时间。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke:
public class DubboInvoker<T> extends AbstractInvoker<T> {private final ExchangeClient[] clients;private final AtomicPositiveInteger index = new AtomicPositiveInteger();private final String version;private final ReentrantLock destroyLock = new ReentrantLock();private final Set<Invoker<?>> invokers;public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients){this(serviceType, url, clients, null);}public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers){super(serviceType, url, new String[] {Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});this.clients = clients;// get version.this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0");this.invokers = invokers;}protected Result doInvoke(final Invocation invocation) throws Throwable {RpcInvocation inv = (RpcInvocation) invocation;final String methodName = RpcUtils.getMethodName(invocation);inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());inv.setAttachment(Constants.VERSION_KEY, version);// 获取连接的复杂均衡算法ExchangeClient currentClient;if (clients.length == 1) {currentClient = clients[0];} else {// 简易轮询算法currentClient = clients[index.getAndIncrement() % clients.length];}try {// 异步配置boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);// 是否需要返回结果boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);// 超时timeoutint timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);if (isOneway) {// 不管是否异步,只要不需要返回结果,直接异步调用,设置结果为nullboolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);currentClient.send(inv, isSent);// 注意:这里设置Fature为nullRpcContext.getContext().setFuture(null);return new RpcResult();} else if (isAsync) {// 异步且需要返回结果,调用后设置结果futureResponseFuture future = currentClient.request(inv, timeout);RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));return new RpcResult();} else {// 同步且需要返回结果,调用后在此等待,直到有结果设置结果,或者超时抛出异常。RpcContext.getContext().setFuture(null);return (Result) currentClient.request(inv, timeout).get();}} catch (TimeoutException e) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (RemotingException e) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}}public boolean isAvailable() {if (!super.isAvailable())return false;for (ExchangeClient client : clients){if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)){//cannot write == not Available ?return true ;}}return false;}public void destroy() {//防止client被关闭多次.在connect per jvm的情况下,client.close方法会调用计数器-1,当计数器小于等于0的情况下,才真正关闭if (super.isDestroyed()){return ;} else {//dubbo check ,避免多次关闭destroyLock.lock();try{if (super.isDestroyed()){return ;}super.destroy();if (invokers != null){invokers.remove(this);}for (ExchangeClient client : clients) {try {client.close();} catch (Throwable t) {logger.warn(t.getMessage(), t);}}}finally {destroyLock.unlock();}}}}
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://code.alibabatech.com/schema/dubbohttp://code.alibabatech.com/schema/dubbo/dubbo.xsd"><dubbo:application name="demo-provider"/><dubbo:registry protocol="zookeeper" address="127.0.0.1:2181"/><dubbo:protocol name="dubbo" port="20880"/><!-- 和本地bean一样实现服务 --><bean id="helloWorldServiceImpl" class="cn.gov.zcy.dubbotest.api.HelloWorldServiceImpl"/><!-- 声明需要暴露的服务接口 --><dubbo:service interface="cn.gov.zcy.dubbotest.api.HelloWorldSerivice" ref="helloWorldServiceImpl"/><dubbo:reference id="helloWorldService" interface="cn.gov.zcy.dubbotest.api.HelloWorldSerivice"/><!-- 这里必须使用scope=remote,避免走InjvmProtocol,原因是如果调用本地服务,异步调用就会失效,因为已经没有意义了 --><dubbo:reference id="asyncHelloWorldService" interface="cn.gov.zcy.dubbotest.api.HelloWorldSerivice" async="true" scope="remote" ><dubbo:method name="hello2" return="true" /></dubbo:reference></beans>
asyncHelloWorldService#hello2为异步调用不需要返回值。
public class DubboTest {public static void main(String[] args) throws Exception {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("application.xml");HelloWorldSerivice asyncHelloWorldService = context.getBean("asyncHelloWorldService", HelloWorldSerivice.class);Response<String> asyncHeturnMessage = asyncHelloWorldService.hello2(null);System.out.println(asyncHeturnMessage);Future<Response<String>> future = RpcContext.getContext().getFuture();System.out.println(future.get().getResult());LockSupport.park();}}
通过前文源码的阅读,我们猜测这里应该返回空RpcResult,并且不返回Future。
可以看到debug到的结果符合预期。
三、Dubbo异步调用bug
要解决这个问题并不难,可以有以下解决方案:
如不需要返回值,可采用oneway的方式(在消费者端配置dubbo:method中指定return="false")。
如确实需要返回值,可以在实际业务代码中使用线程池执行逻辑。
使用Provider端的Filter,清除attachment中的async标志。
我们来看下DubboX提供的解决方案,它采用的就是第三种解决方案,因为这种方案相对更优雅:
另外,使用Idea进行本地debug时还要注意一个经典的
tostring()导致ReferenceBean对象中的ref属性被过早实例化问题。解决该问题有两种方法:
可以通过
Intellij Idea -> Prefrencence -> Build -> Debuger -> Data Views -> Java uncheck Enable 'toString' object view禁用tostring。也可以升级下dubbo版本,官方说已经在
dubbo 2.6.6+版本中修复了该问题。
