vlambda博客
学习文章列表

Dubbo源码之网络通信

介绍了Dubbo通信流程,跟着源码调试过来的,如果有问题还请各位大佬指出

服务暴露将做哪些事情?

  1. 注册ZK,监听动态配置节点
  2. 开启Server端
  3. 创建代理服务
  4. Exporter -> Invoker -> proxyService

服务引用将做哪些事情?

  1. 注册ZK,监听动态配置节点、providr节点、路由节点
  2. 开启Client端
  3. 创建代理服务
  4. proxyService -> Invoker

客户端请求

ConsumerProxyService -> Invoker【DubboInvoker】 -> Exchanger【HeaderExchangeClient】 -> Transporter【NettyClient】 -> 编码 -> SEND-TO-SERVER (创建了DefaultFuture,Request带唯一标识)

服务端响应

解码 -> Transporter【NettyServer】-> 系列Handlers -> 线程池 -> Exporter#getInvoker -> Invoker#invoke -> ProviderProxyService -> callback

Exchanger

Exchangers

门面类,提供各种便捷方法,先通过SPI获取Exchanger,然后调用Exchanger的相关方法创建ExchangeServerExchangeClient

Exchanger

SPI接口,默认实现类HeaderExchanger,提供了两个快捷方法创建ExchangeServerExchangeClient

@SPI(HeaderExchanger.NAME)public interface Exchanger { @Adaptive({Constants.EXCHANGER_KEY}) ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;
@Adaptive({Constants.EXCHANGER_KEY}) ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;}
public class HeaderExchanger implements Exchanger { public static final String NAME = "header";
@Override public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); }
@Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }}

ExchangeServer

Server端使用,默认实现类HeaderExchangeServer,内部调用Transporter开启Server服务

public interface ExchangeServer extends Server { Collection<ExchangeChannel> getExchangeChannels();
ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress);}

ExchangeClient

Client端使用,默认实现类HeaderExchangeClient,核心request方法,内部调用Transporter发送请求

public interface ExchangeClient extends Client, ExchangeChannel {}

ExchangeChannel

默认实现类 HeaderExchangeChannel,作为HeaderExchangeClient的一个属性

Transporter

Transporters

门面类,提供各种便捷方法,先通过SPI获取Transporter,然后调用Transporter的相关方法创建ServerClient

Transporter

SPI接口,默认实现类NettyTransporter,提供了两个快捷方法创建ServerClient

@SPI("netty")public interface Transporter { @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY}) Server bind(URL url, ChannelHandler handler) throws RemotingException;
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) Client connect(URL url, ChannelHandler handler) throws RemotingException;}
public class NettyTransporter implements Transporter { public static final String NAME = "netty";
@Override public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); }
@Override public Client connect(URL url, ChannelHandler listener) throws RemotingException { return new NettyClient(url, listener); }}

Server

Server端使用,默认实现类NettyServer,用于开启Server服务,核心方法doOpen

public class NettyServer extends AbstractServer implements Server {}

Client

Client端使用,默认实现类NettyClient,核心request方法用于发送请求,doOpen用于与服务端建立连接

public class NettyClient extends AbstractClient {}

服务端启动服务

DubboProtocol#export =>DubboProtocol#openServer => DubboProtocol#createServer =>Exchangers#bind => NettyServer#doOpen

最终,在NettyServer#doOpen中通过Netty开启了一个Server端

DubboProtocol#createServer => Exchangers#bind(url, requestHandler) => HeaderExchanger#bind(url, requestHandler) => return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))) // Transporters#bind 语句可以拆解为 Transporters#bind => NettyTransporter#bind(url, handler) => return new NettyServer(url, handler) => NettyServer#doOpen【NettyServer构造函数中调用了doOpen方法】

NettyServer中的hander属性,最终指向的是new DecodeHandler(new HeaderExchangeHandler(handler))。最终Server端返回HeaderExchangeServer,然后在NettyServer的构造函数中,对handle其实还做了一些封装

public NettyServer(URL url, ChannelHandler handler) throws RemotingException { // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));}
public class ChannelHandlers { private static ChannelHandlers INSTANCE = new ChannelHandlers();
protected ChannelHandlers() {}
public static ChannelHandler wrap(ChannelHandler handler, URL url) { return ChannelHandlers.getInstance().wrapInternal(handler, url); } protected static ChannelHandlers getInstance() { return INSTANCE; } static void setTestingChannelHandlers(ChannelHandlers instance) { INSTANCE = instance; } protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }}

所以,最终NettyServer中的hander属性指向MultiMessageHandler -> HeartbeatHandler -> AllDispatcher -> DecodeHandler

客户端连接服务

调用链太长了,而且隐藏的非常深,重点省略了一些,在应用启动时为Reference对象生成Invoker时创建的

RegistryProtocol#doRefer =>RegistryDirectory#subscribe =>RegistryDirectory#toInvokers => ProtocolFilterWrapper#refer =>AbstractProtocol#refer =>DubboProtocol#protocolBindingRefer =>DubboProtocol#getClients =>DubboProtocol#getSharedClient =>DubboProtocol#buildReferenceCountExchangeClientList =>DubboProtocol#buildReferenceCountExchangeClient =>DubboProtocol#initClient =>Exchangers#connect =>HeaderExchanger#connect =>Transporters#connect =>NettyTransporter#connect =>NettyClient#<init> =>NettyClient#doOpen

最终,在NettyClient#doOpen中通过Netty与Server建立连接

Exchangers#connect => HeaderExchanger#connect(url, handler) => return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true) // Transporters#connect 语句可以拆解为 Transporters#connect => NettyTransporter#connect(url, handler) => return new NettyClient(url, handler) => NettyClient#doOpen【NettyClient构造函数中调用了doOpen方法】

NettyClient中的hander属性,最终指向的是new DecodeHandler(new HeaderExchangeHandler(handler))。最终Client端返回HeaderExchangeClient,其中的client属性也对NettyClient做了包装处理

不过在DubboProtocol#buildReferenceCountExchangeClient方法中对HeaderExchangeClient包装了一层,最终Invoker中的Client类型是ReferenceCountExchangeClient

private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) { ExchangeClient exchangeClient = initClient(url);
return new ReferenceCountExchangeClient(exchangeClient);}

ReferenceCountExchangeClientHeaderExchangeClient没什么区别,只不过包装了一层,然后还有一个比较重要的属性referenceCount,用于记录客户端的个数?

客户端发送请求

调用方代理类 ->InvokerInvocationHandler#invoke ->MockClusterInvoker#invoke ->AbstractClusterInvoker#invoke【获取LoadBalance】 -> FailoverClusterInvoker#doInvoke【处理重试次数】 ->ProtocolFilterWrapper#invoke【处理Filter链路】 ->AbstractInvoker#invoke【设置Attachments参数】 ->DubboInvoker#doInvoke【Exchange交接层】 ->ReferenceCountExchangeClient#request ->HeaderExchangeClient#request ->HeaderExchangeChannel#request【return CompletableFuture】 ->AbstractPeer#send ->AbstractClient#send ->NettyChannel#send ->Channel#writeAndFlush【发消息给服务端】

DubboInvoker#doInvoke开始与Exchange层交互,核心代码如下

protected Result doInvoke(final Invocation invocation) throws Throwable { ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); // return = false,即oneWay ,可以减少不必要的Future对象创建 if (isOneway) { // send=true,即客户端发送之后再返回,否则直接返回 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout); asyncRpcResult.subscribeTo(responseFuture); RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult)); return asyncRpcResult; }}
ReferenceCountExchangeClient#request => HeaderExchangeClient#request => HeaderExchangeChannel#request
// HeaderExchangeChannel.javapublic CompletableFuture<Object> request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future;}

在这个方法中,有以下几个需要注意的点:

  1. Request构造函数内部,会为 Request生成一个递增唯一的ID,用于标识该请求
  2. channel#send调用过程中,涉及到 NettyChannel#getOrAddChannel方法的调用, NettyChannel中有一个 ConcurrentMap<Channel, NettyChannel> CHANNEL_MAP缓存,用于维护 io.netty.channel.ChannelNettyChannel 的关系
  3. channel#send调用过程中,最终会调用到 NettyChannel#send方法,该方法真正的将消息发给Server端
  4. 返回的 DefaultFuture是一个 CompletableFuture
// NettyChannel.javapublic void send(Object message, boolean sent) throws RemotingException { boolean success = true; int timeout = 0; try { // 将消息发给Server ChannelFuture future = channel.writeAndFlush(message); if (sent) { // 如果配置了 send=true 参数,客户端需要等待消息发出之后再返回 timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); success = future.await(timeout); } Throwable cause = future.cause(); if (cause != null) { throw cause; } } catch (Throwable e) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); }}

从上面消息发送的流程中,好像没有看到对消息的编码工作,那是因为在Netty客户端初始化的时候,已经设置了编解码器

// NettyClient.java protected void doOpen() throws Throwable { final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); bootstrap = new Bootstrap(); bootstrap.group(nioEventLoopGroup) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .channel(NioSocketChannel.class); if (getConnectTimeout() < 3000) { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000); } else { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout()); }
bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { int heartbeatInterval = UrlUtils.getHeartbeat(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ch.pipeline() .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)) .addLast("handler", nettyClientHandler); String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST); if(socksProxyHost != null) { int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT)); Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort)); ch.pipeline().addFirst(socks5ProxyHandler); } } });}

先经过编码器,即InternalEncoder#encode方法,InternalEncoder实现了MessageToByteEncoder接口,该方法内部调用了Codec2的相关方法,而Codec2是一个SPI接口,默认实现DubboCodec

NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);protected static Codec2 getChannelCodec(URL url) { String codecName = url.getParameter(Constants.CODEC_KEY, "telnet"); if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) { return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName); } else { return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName)); }}

服务端响应请求

上面提到了NettyServer中的hander属性指向 MultiMessageHandler -> HeartbeatHandler -> AllDispatcher -> DecodeHandler -> HeaderExchangeHandlerNettyServer开启Server端的代码如下

protected void doOpen() throws Throwable { bootstrap = new ServerBootstrap(); bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); } }); ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel();}
  1. 先经过解码器,即 InternalDecoder#decode方法, InternalDecoder实现了 ByteToMessageDecoder接口,该方法内部调用了 Codec2的相关方法,而 Codec2是一个SPI接口,默认实现 DubboCodec
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);protected static Codec2 getChannelCodec(URL url) { String codecName = url.getParameter(Constants.CODEC_KEY, "telnet"); if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) { return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName); } else { return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName)); }}
  1. MultiMessageHandler用于处理数组消息,如果是消息是 MultiMessage类型, MultiMessage实现了 Iterable数组,则遍历调用handle的received方法;否则直接调用下一个handle的received方法
  2. AllChannelHandler收到消息,将 channel handler message封装成state为 ChannelState.RECEIVED类型的 ChannelEventRunnable对象,然后交给线程池执行
  3. ChannelEventRunnable#run方法中判断state为 ChannelState.RECEIVED类型,直接执行下一个handler的received方法,即 DecodeHandler,这个过程是由线程池执行
  4. DecodeHandler#received方法中,如果消息是 Decodeable类型,对整个消息进行解码;如果消息是 Request类型,对 Request.getData()进行解码;如果消息是 Response类型,对 Response.getResult()进行解码
  5. HeaderExchangeHandler#received -> HeaderExchangeHandler#handleRequest -> requestHandler#replyrequestHandlerDubboProtocol中的一个属性, ExchangeHandlerAdapter类型
  6. HeaderExchangeHandler#handleRequest中会创建一个 Response对象,它的ID属性值,就是 Request对象的ID值,这样请求和响应就关联起来了
  7. requestHandler#reply方法中,从 exporterMap缓存中获取对应的 DubboExporter对象,然后从 DubboExporter获取 Invoker,最后执行 Invoker#invoke方法,然后返回一个 CompletableFuture对象
  8. HeaderExchangeHandler#handleRequest方法中接收返回的 CompletableFuture对象,对它添加回调处理,在回调中将返回结果封装到 Response对象中,然后通过channel将 Response发出
// ChannelEventRunnable.javapublic void run() { if (state == ChannelState.RECEIVED) { try { // RECEIVED 类型,直接执行下一个handle的received方法,即 DecodeHandler handler.received(channel, message); } catch (Exception e) {} } else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) {} break; case DISCONNECTED: try { handler.disconnected(channel); } catch (Exception e) {} break; case SENT: try { handler.sent(channel, message); } catch (Exception e) {} break; case CAUGHT: try { handler.caught(channel, exception); } catch (Exception e) {} break; default: logger.warn("unknown state: " + state + ", message is " + message); } }
}
InternalDecoder#decode => NettyServerHandler#channelRead => AbstractPeer#received => MultiMessageHandler#received => HeartbeatHandler#received => AllChannelHandler#received  ------------------ 异步执行,放到线程池 ---------------------- => ChannelEventRunnable#run => DecodeHandler#received => DecodeHandler#decode => DecodeableRpcInvocation#decode => HeaderExchangeHandler#received => HeaderExchangeHandler#handleRequest => DubboProtocol.requestHandler#reply ------------------ 异步执行 -----------------------
----------------扩展点------------------- => ProtocolFilterWrapper.invoke => EchoFilter.invoke => ClassLoaderFilter.invoke => GenericFilter.invoke => TraceFilter.invoke => MonitorFilter.invoke => TimeoutFilter.invoke => ExceptionFilter.invoke => InvokerWrapper.invoke -----------------扩展点------------------- => AbstractProxyInvoker#invoke => JavassistProxyFactory.AbstractProxyInvoker#doInvoke => 代理类#invokeMethod => 真正的service方法

//把接收处理的结果,数据发回consumer future#whenComplete => channel.send(response) => HeaderExchangeChannel => NettyChannel.send => NioSocketChannel#writeAndFlush(message)

服务端发送结果

HeaderExchangeChannel#send =>NettyChannel#send => NioSocketChannel#writeAndFlush(message)

客户端响应结果

在客户端启动的时候,入参handler和服务端的handler是同一个

// DubboProtocol#initClientExchangers.connect(url, requestHandler);
// HeaderExchanger#connectpublic ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);}
Transporters#connect => NettyTransporter#connect return NettyClient

NettyClient构造函数中,对handler做了包装

ChannelHandlers.wrap(handler, url)
public class ChannelHandlers { private static ChannelHandlers INSTANCE = new ChannelHandlers(); protected ChannelHandlers() { }
public static ChannelHandler wrap(ChannelHandler handler, URL url) { return ChannelHandlers.getInstance().wrapInternal(handler, url); } protected static ChannelHandlers getInstance() { return INSTANCE; } static void setTestingChannelHandlers(ChannelHandlers instance) { INSTANCE = instance; } protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }}

所以,最终NettyClient中的handler属性指向 MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecodeHandler -> HeaderExchangeHandler -> requestHandler ,和服务端处理流程一样一样

  1. 接收消息,经过 MultiMessageHandlerHeartbeatHandler 处理,到达 AllDispatcher
  2. AllChannelHandler中将消息封装成 new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)类型,交由线程池执行
  3. 线程池执行任务,经过 DecodeHandler到达 HeaderExchangeHandler
  4. HeaderExchangeHandler#received -> HeaderExchangeHandler#handleResponse -> DefaultFuture#receivedDefaultFuture中维护了一个 请求ID和DefaultFuture的映射关系,Request和Response通过请求ID可以一一对应
public static void received(Channel channel, Response response, boolean timeout) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { Timeout t = future.timeoutCheckTask; if (!timeout) { t.cancel(); } future.doReceived(response); } else { } } finally { CHANNELS.remove(response.getId()); }}
private void doReceived(Response res) { if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { this.complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { this.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); }}
  1. 通过 response.Id获取 DefaultFuture
  2. 执行 CompletableFuture#complete方法可以让 执行了 CompletableFuture#get的用户线程得到响应,获取结果返回。至此整个调用过程完成

同步转异步

可是我们在代码中很多时候都是同步调用,很少自己去调用CompletableFuture#get方法,这一部分逻辑又是怎么处理的。在DubboInvoker#doInvoke方法中,返回的是一个AsyncRpcResult

protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT); // return = false,即oneWay ,可以减少不必要的Future对象创建 if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else {c AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout); // 订阅 responseFuture ,当 responseFuture 完成的之后,执行 asyncRpcResult 的complete方法, 这样用户线程就可以响应了 asyncRpcResult.subscribeTo(responseFuture);
RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult)); return asyncRpcResult; } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); }}

AsyncToSyncInvoker

AsyncToSyncInvoker#invoke方法中,会判断是同步调用还是异步调用,如果是同步调用,将调用AsyncRpcResult#get方法阻塞用户线程,以达到同步效果

public Result invoke(Invocation invocation) throws RpcException { Result asyncResult = invoker.invoke(invocation); try { if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) { // 如果是同步调用,调用 asyncResult#get 阻塞用户线程 asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (ExecutionException e) { Throwable t = e.getCause(); if (t instanceof TimeoutException) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } else if (t instanceof RemotingException) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } catch (Throwable e) { throw new RpcException(e.getMessage(), e); } return asyncResult;}