Dubbo源码之网络通信
介绍了Dubbo通信流程,跟着源码调试过来的,如果有问题还请各位大佬指出
服务暴露将做哪些事情?
-
注册ZK,监听动态配置节点 -
开启Server端 -
创建代理服务 -
Exporter -> Invoker -> proxyService
服务引用将做哪些事情?
-
注册ZK,监听动态配置节点、providr节点、路由节点 -
开启Client端 -
创建代理服务 -
proxyService -> Invoker
客户端请求
ConsumerProxyService -> Invoker【DubboInvoker】 -> Exchanger【HeaderExchangeClient】 -> Transporter【NettyClient】 -> 编码 -> SEND-TO-SERVER (创建了DefaultFuture,Request带唯一标识)
服务端响应
解码 -> Transporter【NettyServer】-> 系列Handlers -> 线程池 -> Exporter#getInvoker -> Invoker#invoke -> ProviderProxyService -> callback
Exchanger
Exchangers
门面类,提供各种便捷方法,先通过SPI获取Exchanger,然后调用Exchanger的相关方法创建ExchangeServer、ExchangeClient
Exchanger
SPI接口,默认实现类HeaderExchanger,提供了两个快捷方法创建ExchangeServer、ExchangeClient
@SPI(HeaderExchanger.NAME)public interface Exchanger {@Adaptive({Constants.EXCHANGER_KEY})ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;@Adaptive({Constants.EXCHANGER_KEY})ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;}public class HeaderExchanger implements Exchanger {public static final String NAME = "header";@Overridepublic ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);}@Overridepublic ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));}}
ExchangeServer
Server端使用,默认实现类HeaderExchangeServer,内部调用Transporter开启Server服务
public interface ExchangeServer extends Server {Collection<ExchangeChannel> getExchangeChannels();ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress);}
ExchangeClient
Client端使用,默认实现类HeaderExchangeClient,核心request方法,内部调用Transporter发送请求
public interface ExchangeClient extends Client, ExchangeChannel {}
ExchangeChannel
默认实现类 HeaderExchangeChannel,作为HeaderExchangeClient的一个属性
Transporter
Transporters
门面类,提供各种便捷方法,先通过SPI获取Transporter,然后调用Transporter的相关方法创建Server、Client
Transporter
SPI接口,默认实现类NettyTransporter,提供了两个快捷方法创建Server、Client
@SPI("netty")public interface Transporter {@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})Server bind(URL url, ChannelHandler handler) throws RemotingException;@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})Client connect(URL url, ChannelHandler handler) throws RemotingException;}public class NettyTransporter implements Transporter {public static final String NAME = "netty";@Overridepublic Server bind(URL url, ChannelHandler listener) throws RemotingException {return new NettyServer(url, listener);}@Overridepublic Client connect(URL url, ChannelHandler listener) throws RemotingException {return new NettyClient(url, listener);}}
Server
Server端使用,默认实现类NettyServer,用于开启Server服务,核心方法doOpen
public class NettyServer extends AbstractServer implements Server {}
Client
Client端使用,默认实现类NettyClient,核心request方法用于发送请求,doOpen用于与服务端建立连接
public class NettyClient extends AbstractClient {}
服务端启动服务
DubboProtocol#export =>DubboProtocol#openServer =>DubboProtocol#createServer =>Exchangers#bind =>NettyServer#doOpen
最终,在NettyServer#doOpen中通过Netty开启了一个Server端
DubboProtocol#createServer=> Exchangers#bind(url, requestHandler)=> HeaderExchanger#bind(url, requestHandler)=> return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))))// Transporters#bind 语句可以拆解为Transporters#bind=> NettyTransporter#bind(url, handler)=> return new NettyServer(url, handler)=> NettyServer#doOpen【NettyServer构造函数中调用了doOpen方法】
即NettyServer中的hander属性,最终指向的是new DecodeHandler(new HeaderExchangeHandler(handler))。最终Server端返回HeaderExchangeServer,然后在NettyServer的构造函数中,对handle其实还做了一些封装
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handlersuper(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));}public class ChannelHandlers {private static ChannelHandlers INSTANCE = new ChannelHandlers();protected ChannelHandlers() {}public static ChannelHandler wrap(ChannelHandler handler, URL url) {return ChannelHandlers.getInstance().wrapInternal(handler, url);}protected static ChannelHandlers getInstance() {return INSTANCE;}static void setTestingChannelHandlers(ChannelHandlers instance) {INSTANCE = instance;}protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));}}
所以,最终NettyServer中的hander属性指向MultiMessageHandler -> HeartbeatHandler -> AllDispatcher -> DecodeHandler
客户端连接服务
调用链太长了,而且隐藏的非常深,重点省略了一些,在应用启动时为Reference对象生成Invoker时创建的
RegistryProtocol#doRefer =>RegistryDirectory#subscribe =>RegistryDirectory#toInvokers =>ProtocolFilterWrapper#refer =>AbstractProtocol#refer =>DubboProtocol#protocolBindingRefer =>DubboProtocol#getClients =>DubboProtocol#getSharedClient =>DubboProtocol#buildReferenceCountExchangeClientList =>DubboProtocol#buildReferenceCountExchangeClient =>DubboProtocol#initClient =>Exchangers#connect =>HeaderExchanger#connect =>Transporters#connect =>NettyTransporter#connect =>NettyClient#<init> =>NettyClient#doOpen
最终,在NettyClient#doOpen中通过Netty与Server建立连接
Exchangers#connect=> HeaderExchanger#connect(url, handler)=> return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true)// Transporters#connect 语句可以拆解为Transporters#connect=> NettyTransporter#connect(url, handler)=> return new NettyClient(url, handler)=> NettyClient#doOpen【NettyClient构造函数中调用了doOpen方法】
即NettyClient中的hander属性,最终指向的是new DecodeHandler(new HeaderExchangeHandler(handler))。最终Client端返回HeaderExchangeClient,其中的client属性也对NettyClient做了包装处理
不过在DubboProtocol#buildReferenceCountExchangeClient方法中对HeaderExchangeClient包装了一层,最终Invoker中的Client类型是ReferenceCountExchangeClient
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {ExchangeClient exchangeClient = initClient(url);return new ReferenceCountExchangeClient(exchangeClient);}
ReferenceCountExchangeClient与HeaderExchangeClient没什么区别,只不过包装了一层,然后还有一个比较重要的属性referenceCount,用于记录客户端的个数?
客户端发送请求
调用方代理类 ->InvokerInvocationHandler#invoke ->MockClusterInvoker#invoke ->AbstractClusterInvoker#invoke【获取LoadBalance】 ->FailoverClusterInvoker#doInvoke【处理重试次数】 ->ProtocolFilterWrapper#invoke【处理Filter链路】 ->AbstractInvoker#invoke【设置Attachments参数】 ->DubboInvoker#doInvoke【Exchange交接层】 ->ReferenceCountExchangeClient#request ->HeaderExchangeClient#request ->HeaderExchangeChannel#request【return CompletableFuture】 ->AbstractPeer#send ->AbstractClient#send ->NettyChannel#send ->Channel#writeAndFlush【发消息给服务端】
从DubboInvoker#doInvoke开始与Exchange层交互,核心代码如下
protected Result doInvoke(final Invocation invocation) throws Throwable {ExchangeClient currentClient;if (clients.length == 1) {currentClient = clients[0];} else {currentClient = clients[index.getAndIncrement() % clients.length];}boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);// return = false,即oneWay ,可以减少不必要的Future对象创建if (isOneway) {// send=true,即客户端发送之后再返回,否则直接返回boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);currentClient.send(inv, isSent);RpcContext.getContext().setFuture(null);return AsyncRpcResult.newDefaultAsyncResult(invocation);} else {AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);asyncRpcResult.subscribeTo(responseFuture);RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));return asyncRpcResult;}}
ReferenceCountExchangeClient#request =>HeaderExchangeClient#request =>HeaderExchangeChannel#request
// HeaderExchangeChannel.javapublic CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {if (closed) {throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");}// create request.Request req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setData(request);DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);try {channel.send(req);} catch (RemotingException e) {future.cancel();throw e;}return future;}
在这个方法中,有以下几个需要注意的点:
-
在 Request构造函数内部,会为Request生成一个递增唯一的ID,用于标识该请求 -
channel#send调用过程中,涉及到NettyChannel#getOrAddChannel方法的调用,NettyChannel中有一个ConcurrentMap<Channel, NettyChannel> CHANNEL_MAP缓存,用于维护io.netty.channel.Channel和NettyChannel的关系 -
channel#send调用过程中,最终会调用到NettyChannel#send方法,该方法真正的将消息发给Server端 -
返回的 DefaultFuture是一个CompletableFuture
// NettyChannel.javapublic void send(Object message, boolean sent) throws RemotingException {boolean success = true;int timeout = 0;try {// 将消息发给ServerChannelFuture future = channel.writeAndFlush(message);if (sent) {// 如果配置了 send=true 参数,客户端需要等待消息发出之后再返回timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);success = future.await(timeout);}Throwable cause = future.cause();if (cause != null) {throw cause;}} catch (Throwable e) {throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);}if (!success) {throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()+ "in timeout(" + timeout + "ms) limit");}}
从上面消息发送的流程中,好像没有看到对消息的编码工作,那是因为在Netty客户端初始化的时候,已经设置了编解码器
// NettyClient.javaprotected void doOpen() throws Throwable {final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);bootstrap = new Bootstrap();bootstrap.group(nioEventLoopGroup).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).channel(NioSocketChannel.class);if (getConnectTimeout() < 3000) {bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);} else {bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());}bootstrap.handler(new ChannelInitializer() {@Overrideprotected void initChannel(Channel ch) throws Exception {int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);ch.pipeline().addLast("decoder", adapter.getDecoder()).addLast("encoder", adapter.getEncoder()).addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)).addLast("handler", nettyClientHandler);String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);if(socksProxyHost != null) {int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));ch.pipeline().addFirst(socks5ProxyHandler);}}});}
先经过编码器,即InternalEncoder#encode方法,InternalEncoder实现了MessageToByteEncoder接口,该方法内部调用了Codec2的相关方法,而Codec2是一个SPI接口,默认实现DubboCodec
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);protected static Codec2 getChannelCodec(URL url) {String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);} else {return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));}}
服务端响应请求
上面提到了NettyServer中的hander属性指向 MultiMessageHandler -> HeartbeatHandler -> AllDispatcher -> DecodeHandler -> HeaderExchangeHandler而NettyServer开启Server端的代码如下
protected void doOpen() throws Throwable {bootstrap = new ServerBootstrap();bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),new DefaultThreadFactory("NettyServerWorker", true));final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);channels = nettyServerHandler.getChannels();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE).childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {int idleTimeout = UrlUtils.getIdleTimeout(getUrl());NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug.addLast("decoder", adapter.getDecoder()).addLast("encoder", adapter.getEncoder()).addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)).addLast("handler", nettyServerHandler);}});ChannelFuture channelFuture = bootstrap.bind(getBindAddress());channelFuture.syncUninterruptibly();channel = channelFuture.channel();}
-
先经过解码器,即 InternalDecoder#decode方法,InternalDecoder实现了ByteToMessageDecoder接口,该方法内部调用了Codec2的相关方法,而Codec2是一个SPI接口,默认实现DubboCodec
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);protected static Codec2 getChannelCodec(URL url) {String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);} else {return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));}}
-
MultiMessageHandler用于处理数组消息,如果是消息是MultiMessage类型,MultiMessage实现了Iterable数组,则遍历调用handle的received方法;否则直接调用下一个handle的received方法 -
AllChannelHandler收到消息,将channel handler message封装成state为ChannelState.RECEIVED类型的ChannelEventRunnable对象,然后交给线程池执行 -
ChannelEventRunnable#run方法中判断state为ChannelState.RECEIVED类型,直接执行下一个handler的received方法,即DecodeHandler,这个过程是由线程池执行 -
DecodeHandler#received方法中,如果消息是Decodeable类型,对整个消息进行解码;如果消息是Request类型,对Request.getData()进行解码;如果消息是Response类型,对Response.getResult()进行解码 -
HeaderExchangeHandler#received->HeaderExchangeHandler#handleRequest->requestHandler#reply,requestHandler是DubboProtocol中的一个属性,ExchangeHandlerAdapter类型 -
HeaderExchangeHandler#handleRequest中会创建一个Response对象,它的ID属性值,就是Request对象的ID值,这样请求和响应就关联起来了 -
requestHandler#reply方法中,从exporterMap缓存中获取对应的DubboExporter对象,然后从DubboExporter获取Invoker,最后执行Invoker#invoke方法,然后返回一个CompletableFuture对象 -
HeaderExchangeHandler#handleRequest方法中接收返回的CompletableFuture对象,对它添加回调处理,在回调中将返回结果封装到Response对象中,然后通过channel将Response发出
// ChannelEventRunnable.javapublic void run() {if (state == ChannelState.RECEIVED) {try {// RECEIVED 类型,直接执行下一个handle的received方法,即 DecodeHandlerhandler.received(channel, message);} catch (Exception e) {}} else {switch (state) {case CONNECTED:try {handler.connected(channel);} catch (Exception e) {}break;case DISCONNECTED:try {handler.disconnected(channel);} catch (Exception e) {}break;case SENT:try {handler.sent(channel, message);} catch (Exception e) {}break;case CAUGHT:try {handler.caught(channel, exception);} catch (Exception e) {}break;default:logger.warn("unknown state: " + state + ", message is " + message);}}}
InternalDecoder#decode=> NettyServerHandler#channelRead=> AbstractPeer#received=> MultiMessageHandler#received=> HeartbeatHandler#received=> AllChannelHandler#received------------------ 异步执行,放到线程池 ----------------------=> ChannelEventRunnable#run=> DecodeHandler#received=> DecodeHandler#decode=> DecodeableRpcInvocation#decode=> HeaderExchangeHandler#received=> HeaderExchangeHandler#handleRequest=> DubboProtocol.requestHandler#reply------------------ 异步执行 ---------------------------------------扩展点-------------------=> ProtocolFilterWrapper.invoke=> EchoFilter.invoke=> ClassLoaderFilter.invoke=> GenericFilter.invoke=> TraceFilter.invoke=> MonitorFilter.invoke=> TimeoutFilter.invoke=> ExceptionFilter.invoke=> InvokerWrapper.invoke-----------------扩展点-------------------=> AbstractProxyInvoker#invoke=> JavassistProxyFactory.AbstractProxyInvoker#doInvoke=> 代理类#invokeMethod=> 真正的service方法//把接收处理的结果,数据发回consumer future#whenComplete=> channel.send(response)=> HeaderExchangeChannel=> NettyChannel.send=> NioSocketChannel#writeAndFlush(message)
服务端发送结果
HeaderExchangeChannel#send =>NettyChannel#send =>NioSocketChannel#writeAndFlush(message)
客户端响应结果
在客户端启动的时候,入参handler和服务端的handler是同一个
// DubboProtocol#initClientExchangers.connect(url, requestHandler);// HeaderExchanger#connectpublic ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);}Transporters#connect =>NettyTransporter#connectreturn NettyClient
在NettyClient构造函数中,对handler做了包装
ChannelHandlers.wrap(handler, url)public class ChannelHandlers {private static ChannelHandlers INSTANCE = new ChannelHandlers();protected ChannelHandlers() {}public static ChannelHandler wrap(ChannelHandler handler, URL url) {return ChannelHandlers.getInstance().wrapInternal(handler, url);}protected static ChannelHandlers getInstance() {return INSTANCE;}static void setTestingChannelHandlers(ChannelHandlers instance) {INSTANCE = instance;}protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));}}
所以,最终NettyClient中的handler属性指向 MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecodeHandler -> HeaderExchangeHandler -> requestHandler ,和服务端处理流程一样一样
-
接收消息,经过 MultiMessageHandler、HeartbeatHandler处理,到达AllDispatcher -
AllChannelHandler中将消息封装成new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)类型,交由线程池执行 -
线程池执行任务,经过 DecodeHandler到达HeaderExchangeHandler -
HeaderExchangeHandler#received -> HeaderExchangeHandler#handleResponse -> DefaultFuture#received,DefaultFuture中维护了一个请求ID和DefaultFuture的映射关系,Request和Response通过请求ID可以一一对应
public static void received(Channel channel, Response response, boolean timeout) {try {DefaultFuture future = FUTURES.remove(response.getId());if (future != null) {Timeout t = future.timeoutCheckTask;if (!timeout) {t.cancel();}future.doReceived(response);} else {}} finally {CHANNELS.remove(response.getId());}}private void doReceived(Response res) {if (res == null) {throw new IllegalStateException("response cannot be null");}if (res.getStatus() == Response.OK) {this.complete(res.getResult());} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));} else {this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));}}
-
通过 response.Id获取DefaultFuture -
执行 CompletableFuture#complete方法可以让 执行了CompletableFuture#get的用户线程得到响应,获取结果返回。至此整个调用过程完成
同步转异步
可是我们在代码中很多时候都是同步调用,很少自己去调用CompletableFuture#get方法,这一部分逻辑又是怎么处理的。在DubboInvoker#doInvoke方法中,返回的是一个AsyncRpcResult
protected Result doInvoke(final Invocation invocation) throws Throwable {RpcInvocation inv = (RpcInvocation) invocation;final String methodName = RpcUtils.getMethodName(invocation);inv.setAttachment(PATH_KEY, getUrl().getPath());inv.setAttachment(VERSION_KEY, version);ExchangeClient currentClient;if (clients.length == 1) {currentClient = clients[0];} else {currentClient = clients[index.getAndIncrement() % clients.length];}try {boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);// return = false,即oneWay ,可以减少不必要的Future对象创建if (isOneway) {boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);currentClient.send(inv, isSent);RpcContext.getContext().setFuture(null);return AsyncRpcResult.newDefaultAsyncResult(invocation);} else {cAsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);// 订阅 responseFuture ,当 responseFuture 完成的之后,执行 asyncRpcResult 的complete方法, 这样用户线程就可以响应了asyncRpcResult.subscribeTo(responseFuture);RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));return asyncRpcResult;}} catch (TimeoutException e) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (RemotingException e) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}}
AsyncToSyncInvoker
在AsyncToSyncInvoker#invoke方法中,会判断是同步调用还是异步调用,如果是同步调用,将调用AsyncRpcResult#get方法阻塞用户线程,以达到同步效果
public Result invoke(Invocation invocation) throws RpcException {Result asyncResult = invoker.invoke(invocation);try {if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {// 如果是同步调用,调用 asyncResult#get 阻塞用户线程asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);}} catch (InterruptedException e) {throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (ExecutionException e) {Throwable t = e.getCause();if (t instanceof TimeoutException) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} else if (t instanceof RemotingException) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}} catch (Throwable e) {throw new RpcException(e.getMessage(), e);}return asyncResult;}
