Dubbo异步调用源码解读和Bug分析
As dubbo is based on a non-blocking NIO network layer, the client can start parallel call to multiple remote services without explicitly starting mulithreads, which costs relatively fewer resources.
一、Dubbo调用类型
Dubbo调用类型分为同步调用、异步调用和是否返回结果配置
默认为同步调用,并且有返回结果。
异步调用配置,设置 async="true",异步调用可以提高效率。
默认是有返回结果,设置return="false"则不需要返回,可以减少等待结果时间。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
:
public class DubboInvoker<T> extends AbstractInvoker<T> {
private final ExchangeClient[] clients;
private final AtomicPositiveInteger index = new AtomicPositiveInteger();
private final String version;
private final ReentrantLock destroyLock = new ReentrantLock();
private final Set<Invoker<?>> invokers;
public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients){
this(serviceType, url, clients, null);
}
public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers){
super(serviceType, url, new String[] {Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});
this.clients = clients;
// get version.
this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0");
this.invokers = invokers;
}
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
// 获取连接的复杂均衡算法
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
// 简易轮询算法
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 异步配置
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
// 是否需要返回结果
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 超时timeout
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
// 不管是否异步,只要不需要返回结果,直接异步调用,设置结果为null
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
// 注意:这里设置Fature为null
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
// 异步且需要返回结果,调用后设置结果future
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
// 同步且需要返回结果,调用后在此等待,直到有结果设置结果,或者超时抛出异常。
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
public boolean isAvailable() {
if (!super.isAvailable())
return false;
for (ExchangeClient client : clients){
if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)){
//cannot write == not Available ?
return true ;
}
}
return false;
}
public void destroy() {
//防止client被关闭多次.在connect per jvm的情况下,client.close方法会调用计数器-1,当计数器小于等于0的情况下,才真正关闭
if (super.isDestroyed()){
return ;
} else {
//dubbo check ,避免多次关闭
destroyLock.lock();
try{
if (super.isDestroyed()){
return ;
}
super.destroy();
if (invokers != null){
invokers.remove(this);
}
for (ExchangeClient client : clients) {
try {
client.close();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}finally {
destroyLock.unlock();
}
}
}
}
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://code.alibabatech.com/schema/dubbo
http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<dubbo:application name="demo-provider"/>
<dubbo:registry protocol="zookeeper" address="127.0.0.1:2181"/>
<dubbo:protocol name="dubbo" port="20880"/>
<!-- 和本地bean一样实现服务 -->
<bean id="helloWorldServiceImpl" class="cn.gov.zcy.dubbotest.api.HelloWorldServiceImpl"/>
<!-- 声明需要暴露的服务接口 -->
<dubbo:service interface="cn.gov.zcy.dubbotest.api.HelloWorldSerivice" ref="helloWorldServiceImpl"/>
<dubbo:reference id="helloWorldService" interface="cn.gov.zcy.dubbotest.api.HelloWorldSerivice"/>
<!-- 这里必须使用scope=remote,避免走InjvmProtocol,原因是如果调用本地服务,异步调用就会失效,因为已经没有意义了 -->
<dubbo:reference id="asyncHelloWorldService" interface="cn.gov.zcy.dubbotest.api.HelloWorldSerivice" async="true" scope="remote" >
<dubbo:method name="hello2" return="true" />
</dubbo:reference>
</beans>
asyncHelloWorldService#hello2
为异步调用不需要返回值。
public class DubboTest {
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("application.xml");
HelloWorldSerivice asyncHelloWorldService = context.getBean("asyncHelloWorldService", HelloWorldSerivice.class);
Response<String> asyncHeturnMessage = asyncHelloWorldService.hello2(null);
System.out.println(asyncHeturnMessage);
Future<Response<String>> future = RpcContext.getContext().getFuture();
System.out.println(future.get().getResult());
LockSupport.park();
}
}
通过前文源码的阅读,我们猜测这里应该返回空RpcResult,并且不返回Future。
可以看到debug到的结果符合预期。
三、Dubbo异步调用bug
要解决这个问题并不难,可以有以下解决方案:
如不需要返回值,可采用oneway的方式(在消费者端配置dubbo:method中指定return="false")。
如确实需要返回值,可以在实际业务代码中使用线程池执行逻辑。
使用Provider端的Filter,清除attachment中的async标志。
我们来看下DubboX提供的解决方案,它采用的就是第三种解决方案,因为这种方案相对更优雅:
另外,使用Idea进行本地debug时还要注意一个经典的
tostring()
导致ReferenceBean
对象中的ref
属性被过早实例化问题。解决该问题有两种方法:
可以通过
Intellij Idea -> Prefrencence -> Build -> Debuger -> Data Views -> Java uncheck Enable 'toString' object view
禁用tostring。也可以升级下dubbo版本,官方说已经在
dubbo 2.6.6+
版本中修复了该问题。