dubbo学习-发起请求和处理请求源码剖析
1. 服务发起调用过程源码剖析
在服务引用中我们讲到,最后会生成代理类,代理类中使用了InvokerInvocationHandler拦截目标类的请求。所以在consumer端调用provider端服务的某个接口时,请求会被InvokerInvocationHandler拦截,并执行内部调用过程。
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {String methodName = method.getName();/*** 省略无关代码*/return invoker.invoke(new RpcInvocation(method, args)).recreate();}
InvokerInvocationHandler的invoker方法内部逻辑会把请求转发给invoker实例,在转发请求之前,会根据请求的方法名和参数构建RpcInvocation实例。
在创建InvokerInvocationHandler的实例时,会把服务引用过程创建的Invoker实例传入InvokerInvocationHandler。服务引用返回的Invoker实例是MockClusterInvoker,内部持有默认的集群实现的FailoverClusterInvoker实例。所以会先执行MockClusterInvoker的invoker方法,方法内部内部逻辑主要是根据配置的mock参数,来判断是否需要执行mock调用,以及mock调用的策略是什么。默认是不需要进行mock调用,也就是说直接将请求转发给MockClusterInvoker实例内部持有的FailoverClusterInvoker实例的invoke方法处理。
1.1 Invoker实例的invoker方法执行
FailoverClusterInvoker继承自AbstractClusterInvoker,AbstractClusterInvoker对invoker方法做了实现。
public Result invoke(final Invocation invocation) throws RpcException {checkWhetherDestroyed(); // ①// binding attachments into invocation.Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();if (contextAttachments != null && contextAttachments.size() != 0) {((RpcInvocation) invocation).addAttachments(contextAttachments);}List<Invoker<T>> invokers = list(invocation); // ②LoadBalance loadbalance = initLoadBalance(invokers, invocation); // ③RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);return doInvoke(invocation, invokers, loadbalance); // ④}
①: 首先检查consumer端是否被销毁,如果被销毁则会抛出异常,告知consumer已经销毁,不能再发起调用。
②: 从Directory实例中获取可调用的Invoker列表。
③: 初始化负载均衡实例,会通过配置loadbalance的值,使用spi机制,创建Loadbalance实例。默认为RandomLoadbalance。
④: 调用doInvoker方法,执行调用过程。
doInvoker方法是一个抽象方法,由子类实现,这里的实现类是FailoverClusterInvoker。
1.1.1 FailoverClusterInvoker的doInvoker方法实现
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {List<Invoker<T>> copyInvokers = invokers;// 检查可选invoker列表是否有invoker元素checkInvokers(copyInvokers, invocation); // ①String methodName = RpcUtils.getMethodName(invocation);// 获取配置的retries值,默认为2。也就是说如果调用发生错误,最多会重试2次,执行调用3次。int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; // ②if (len <= 0) {len = 1;}// retry loop.// 记录调用是否发生异常RpcException le = null;// 记录已经调用了的Invoker实例,如果失败了,在重试的时候,就不会选择List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size());Set<String> providers = new HashSet<String>(len);for (int i = 0; i < len; i++) {// 发生了重试if (i > 0) { // ③// 判断consumer端是否被销毁checkWhetherDestroyed();// 重新获取invoker列表。因为有可能某个provider宕机了,invoker列表会被更新,这时候重新获取就不会获取到宕机的providercopyInvokers = list(invocation);// 重新检测invoker列表是否有invoker元素。checkInvokers(copyInvokers, invocation);}// 根据负载均衡策略选择一个invoker实例Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); // ④invoked.add(invoker);RpcContext.getContext().setInvokers((List) invoked);try {// 执行方法调用Result result = invoker.invoke(invocation); // ⑤if (le != null && logger.isWarnEnabled()) {// 省略异常代码}return result;} catch (RpcException e) {if (e.isBiz()) { // biz exception.throw e;}le = e;} catch (Throwable e) {le = new RpcException(e.getMessage(), e);} finally {providers.add(invoker.getUrl().getAddress());}}// 省略异常代码}
①: 首先会检查Invoker列表,判断列表是否存在Invoker元素。
②: 获取配置的retries的值,默认为2。也就是说如果调用发生错误,会进行重试,最多重试2次,进行3次调用。
④: 根据负载均衡策略选择一个Invoker实例。默认为RandomLoadbalance,也就是进行随机获取。
⑤: 使用选择的Invoker实例,执行方法调用。
在进行服务引用过程中, ProtocolFilterWrapper实例会创建调用链,对执行实际调用的Invoker进行封装,构建的调用链如图:
在进入到真正的AsyncToSyncInvoker的invoker方法之前, 会先经过前面三个Filter的invoker方法。
ConsumerContextFilter: 顾名思义,主要是对consumer的上下文context内容进行赋值。在完成调用之后,会清除掉上下文信息。
FutureFitler: 针对方法配置的oninvoker、onreturn、onthrow等的处理封装。
MonitorFilter: 把调用信息记录,统计,然后上报到Monitor中心。
经过上面调用链中的Filter处理之后,调用会进入到AsyncToSyncInvoker中的invoker方法。AsyncToSyncInvoker对象持有DubboInvoker的引用,它的invoker方法,会调用DubboInvoker的invoker方法获取结果。下图可以看出,此时AsyncToSyncInvoker的invoker属性的实例是DubboInvoker实例。
public Result invoke(Invocation invocation) throws RpcException {// 调用DubboInvoker的invoker方法获取结果Result asyncResult = invoker.invoke(invocation);try {// 如果调用方式时同步,那么就会一直等待结果的返回。因为等待的时间是Integer.MAX_VALUEif (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);}} catch (InterruptedException e) { // 异常的处理throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (ExecutionException e) {Throwable t = e.getCause();if (t instanceof TimeoutException) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} else if (t instanceof RemotingException) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}} catch (Throwable e) {throw new RpcException(e.getMessage(), e);}return asyncResult;}
AsyncToSyncInvoker的invoker方法逻辑,首先会将请求转发到DubboInvoker的invoker方法, 返回一个Result结果;然后会判断调用方法的请求方式,如果是同步请求,也就是需要等待结果返回,那么就会等待请求结果的返回;如果出现异常,那么在这一层会对异常信息进行封装。
1.1.2 DubboInvoker的invoker方法实现
DubboInvoker继承了AbstractInvoker,AbstractInvoker对invoker方法做了实现,所以DubboInvoker处理请求之前,会先经过父类的invoker方法处理。
public Result invoke(Invocation inv) throws RpcException {// 判断当前Invoker是否可用,如果不可用,打印warn日志。但是请求还是会继续if (destroyed.get()) {logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, " + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");}RpcInvocation invocation = (RpcInvocation) inv;invocation.setInvoker(this);if (CollectionUtils.isNotEmptyMap(attachment)) {invocation.addAttachmentsIfAbsent(attachment);}Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();if (CollectionUtils.isNotEmptyMap(contextAttachments)) {invocation.addAttachments(contextAttachments);}// 设置方法请求模式。Future、Sync、async等invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);try {// 调用doInvoker方法,抽象方法,有子类实现return doInvoke(invocation);} catch (InvocationTargetException e) { // biz exception// 省略异常处理代码}}
该方法主要是添加调用上下文信息到RpcInvocation的attachment量中,并且设置了方法调用的模式。然后调用doInvoker方法进行后的调用处理,doInvoker方法是一个抽象方法,由子类实现。下面继续看DubboInvoker的doInvoker方法。
protected Result doInvoke(final Invocation invocation) throws Throwable {RpcInvocation inv = (RpcInvocation) invocation;final String methodName = RpcUtils.getMethodName(invocation);// 设置path和version放入到attachment中inv.setAttachment(PATH_KEY, getUrl().getPath());inv.setAttachment(VERSION_KEY, version);// 获取客户端ExchangeClient currentClient; // ①if (clients.length == 1) {currentClient = clients[0];} else {currentClient = clients[index.getAndIncrement() % clients.length];}try {boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);// 获取配置的timeout值int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);// 如果是onyWay的方法,请求发送之后,直接返回一个默认的结果if (isOneway) { // ②boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);currentClient.send(inv, isSent);return AsyncRpcResult.newDefaultAsyncResult(invocation);} else {// 非onyWay的方式,使用AsyncRpcResult包装返回结果AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); // ③CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout); // ④// 订阅响应结果,当请求响应返回后,会将结果放入到asyncRpcResult中asyncRpcResult.subscribeTo(responseFuture); // ⑤FutureContext.getContext().setCompatibleFuture(responseFuture);return asyncRpcResult;}} catch (TimeoutException e) {// 省略异常处理代码}}
①: 获取负责和服务端通信的client,clients是一个ExchangeClient实例数组,在DubboInvoker进行refer时会根据配置创建共享客户端还是一个服务一个客户端。
②: 判断服务调用方式,如果是oneWay的方式,则通过ExchangeClient的send方法发送请求到服务端。oneWay的调用表示,一次调用,不需要返回,客户端线程请求发出即结束,立即释放线程资源。
③: 如果是非onyWay的调用方式,则通过AsyncRpcResult封装返回结果。
④: 调用客户端的request方法发送请求到服务端。
⑤: AsyncRpcResult返回结果订阅responseFuture的结果。在responseFuture获取到结果之后,会将结果放入到AsyncRpcResult实例中。
ExchangeClient的request方法经过一层一层的请求转发,请求最后会达到NettyChannel的send方法。
public void send(Object message, boolean sent) throws RemotingException {// 首先会检查channel是否关闭了super.send(message, sent);boolean success = true;int timeout = 0;try {ChannelFuture future = channel.writeAndFlush(message); // 使用NioSocketChannel将请求刷出缓存区if (sent) {// wait timeout mstimeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); // 获取配置的timeout参数值success = future.await(timeout); // 等待指定时间,如果超时,则抛出异常}Throwable cause = future.cause();if (cause != null) {throw cause;}} catch (Throwable e) {throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);}if (!success) {throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()+ "in timeout(" + timeout + "ms) limit");}}
这个方法完成的工作就是将客户端的请求通过NioSocketChannel发送到服务端,获取到返回的ChannelFuture实例,然后会等待配置的timeout时间,如果在规定的时间内,服务端还未将结果返回,则会抛出异常。
1.2 服务发送调用过程总结
下面用一张图来总结下发起调用的过程。
2. 服务接收处理调用过程源码剖析
consumer端完成了调用请求的发送,provider端接收到consumer端的请求,开始进行请求处理的逻辑。在分析服务暴露的过程中,我们知道provider会开启NettyServer进行服务监听,在开启NettyServer服务监听的过程中,会绑定NettyServerHandler对来处理客户端的请求。也就是说客户端的请求是从NettyServerHandler开始进入到服务端的。
2.1 ChannelHandler间的请求处理转发
回顾服务暴露分析中,在开启NettyServer的过程里,会构建ChannelHandler链,请求的处理转发也会在ChannelHandler链中进行。
2.1.1 NettyServerHandler的channelActive方法
首先接收请求的是NettyServerHandler,入口方法为channelActive。
public void channelActive(ChannelHandlerContext ctx) throws Exception {NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);try {if (channel != null) {channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);}// 调用NettyServer的connected方法,将请求转发给NettyServerhandler.connected(channel);} finally {NettyChannel.removeChannelIfDisconnected(ctx.channel());}}
该方法接收到请求之后,会将请求转发给NettyServer处理。NettySever继承了AbstractServer,AbstractServer实现了connected方法。
2.1.2 AbstractServer的connected方法
public void connected(Channel ch) throws RemotingException {// 如果NettyServer已经进入到关闭程序,则会拒绝所有的新请求。if (this.isClosing() || this.isClosed()) { // ①logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");ch.close();return;}Collection<Channel> channels = getChannels();// 判断允许最大连接数是否小于当前channel个数if (accepts > 0 && channels.size() > accepts) { // ②logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);ch.close();return;}// 将请求转发给父类处理super.connected(ch); // ③}
①: 首先会判断服务端是否已经进入了关闭状态,如果是,则会拒绝所有的新请求。打印warn日志,然后关闭处理请求的channel。
②: 判断允许最大的连接数是否小于当前正在处理请求的channel个数。如果是,打印error日志,然后关闭处理请求的channle。
③: 将请求转发给父类处理。
AbstractServer继承自AbstractPeer,AbstractPeer的connected方法很简单,首先判断NettyServer是否已经关闭,如果已经关闭,则直接返回,反之,将请求转发给它持有的ChannelHandler实例处理。根据上面的ChannelHandler处理链,AbstractPeer将请求转发给MultiMessageHandler处理,MultiMessageHandler继承了AbstractChannelHandlerDelegate,且都没有重写connected方法,所有它的connected方法由父类实现。而AbstractChannelHandlerDelegate的connected方法没有做任何的逻辑处理,直接将请求转发给了下一个ChannelHandler处理。MultiMessageHandler的下一个ChannelHandler是HeartbeatHandler,HeartbeatHandler的connected方法记录了readTimestamp和writeTimestamp两个值,然后将请求转发给下一个ChannelHandler——AllChannelHandler。
2.1.3 AllChannelHandler的connected方法
public void connected(Channel channel) throws RemotingException {ExecutorService executor = getExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));} catch (Throwable t) {throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);}}
该方法使用线程池提交了一个ChannelEventRunnable任务,见名知意,ChannelEventRunnable肯定是实现了Runnable接口,所有具体处理请求的逻辑看ChannelEventRunnable的run方法。
2.1.4 ChannelEventRunnable的run方法
AllChannelHandler将请求转化为ChannelEventRunnable,然后放到了线程池中去执行,主要看它的run方法实现。
public void run() {if (state == ChannelState.RECEIVED) {try {handler.received(channel, message);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}} else {switch (state) {case CONNECTED:try {handler.connected(channel);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case DISCONNECTED:try {handler.disconnected(channel);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case SENT:try {handler.sent(channel, message);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}break;case CAUGHT:try {handler.caught(channel, exception);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is: " + message + ", exception is " + exception, e);}break;default:logger.warn("unknown state: " + state + ", message is " + message);}}}
该方法通过判断ChannelState的值,不同的值会调用不同的方法进行处理。现在我们分析的是方法调用请求,所以主要看ChannelState.RECEIVED的值处理逻辑。当state值是ChannelState.RECEIVED时,会将请求转发到下一个ChannelHandler——DecodeHandler。
2.1.5 DecodeHandler的received方法
DecodeHandler主要是进行解码的操作,所以它接收到请求之后,就是把接收到的消息进行解码。
public void received(Channel channel, Object message) throws RemotingException {if (message instanceof Decodeable) {decode(message);}// 如果是请求消息,则解码它的数据部分if (message instanceof Request) {decode(((Request) message).getData());}// 如果是响应消息,则解码它的结果部分if (message instanceof Response) {decode(((Response) message).getResult());}handler.received(channel, message);}
DecodeHandler完成对请求消息的解码之后,会将请求转发到下一个ChannelHandler——HeaderExchangeHandler。
2.1.6 HeaderExchangeHandler的received方法
HeaderExchangeHandler的received方法,针对不同的场景做了不同的处理。比如请求事件,请求事件有可分为只读、需要返回值或者不需要返回值。同时也支持Telnet的调用方式。
public void received(Channel channel, Object message) throws RemotingException {channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); // ①final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);try {// 请求事件if (message instanceof Request) {// handle request.Request request = (Request) message;if (request.isEvent()) {handlerEvent(channel, request); // ②} else {// 需要返回值的请求if (request.isTwoWay()) {handleRequest(exchangeChannel, request); // ③} else {// 不需要返回值的请求handler.received(exchangeChannel, request.getData()); // ④}}} else if (message instanceof Response) {// 处理响应handleResponse(channel, (Response) message); // ⑤} else if (message instanceof String) {// 判断是否支持Telnet调用if (isClientSide(channel)) { // ⑥Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());logger.error(e.getMessage(), e);} else {// 执行Telnet调用,并返回String echo = handler.telnet(channel, (String) message); // ⑦if (echo != null && echo.length() > 0) {channel.send(echo);}}} else {handler.received(exchangeChannel, message);}} finally {HeaderExchangeChannel.removeChannelIfDisconnected(channel);}}
①: 负责响应读取时间并更新时间戳。在dubbo心跳处理中会使用当前值并判断是否超过空闲时间。
②: 处理只读事件,用于dubbo优雅停机。当注册中心返注册元数据时,因为网络原因,客户端不能及时感应注册中心中心事件,服务端会发送readonly报文告知下线。handlerEvent方法的逻辑,会在处理事件的Channel中设置一个属性名为channel.readonly的值为true。
③: 处理双向通信的请求。需要等待获取请求结果。
④: 如果是单向通信的请求。不需要等待获取请求结果。
⑤: 处理响应对象,服务消费方会执行此处逻辑。
⑥: 判断是否支持telnet的调用,如果不支持,则打印error日志。
⑦: 支持Telnet调用逻辑。
第③和④步中,调用请求会被转发到DubboProtocol的内部requestHandler去处理,对应的方法是reply。这个在后面分析,先来看看第⑤步的响应处理。
static void handleResponse(Channel channel, Response response) throws RemotingException {if (response != null && !response.isHeartbeat()) {DefaultFuture.received(channel, response);}}
该方法通过调用DefaultFuture的received方法完成响应结果的设置,我们继续跟下去,会发现最后会通过调用doReceived方法完成结果的设置。
private void doReceived(Response res) {if (res == null) {throw new IllegalStateException("response cannot be null");}// 如果调用正常完成,设置调用结果if (res.getStatus() == Response.OK) {this.complete(res.getResult());} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {// 如果超时了,则会设置超时异常this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));} else {// 设置具体发生的异常信息this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));}}
2.1.6.1 handleRequest方法实现
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {Response res = new Response(req.getId(), req.getVersion());if (req.isBroken()) { // 检测请求是否合法,不合法则返回状态码为BAD_REQUESTObject data = req.getData();String msg;if (data == null) {msg = null;} else if (data instanceof Throwable) {msg = StringUtils.toString((Throwable) data);} else {msg = data.toString();}res.setErrorMessage("Fail to decode request due to: " + msg);res.setStatus(Response.BAD_REQUEST);channel.send(res);return;}// find handler by message class.Object msg = req.getData();try {// 调用DubboProtocol的reply方法CompletionStage<Object> future = handler.reply(channel, msg);future.whenComplete((appResult, t) -> {try {if (t == null) { // 没有发生异常,设置结果res.setStatus(Response.OK);res.setResult(appResult);} else { // 设置异常结果res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(t));}channel.send(res); // 将结果发送到channel中} catch (RemotingException e) {logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);} finally {// HeaderExchangeChannel.removeChannelIfDisconnected(channel);}});} catch (Throwable e) {// 发送异常结果res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(e));channel.send(res);}}
该方法首先会检测调用是否合法,不合法则返回状态码为BAD_REQUEST。然后将请求转发给下一个ChannelHandler——DubboProtocol的requestHandler。最后会把返回结果封装到Response对象,返回给消费者。
3. DubboProtocol处理调用请求
经过上了上述的ChannelHandler的处理,最后请求会进入到DubboProtocol内部进行处理,处理请求的Handler是requestHandler属性,requestHandler是ExchangeHandlerAdapter实例,ExchangeHandlerAdapter通过实现了ChannelHandler接口。主要处理请求的逻辑方法是reply方法。
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {if (!(message instanceof Invocation)) {throw new RemotingException(channel, "Unsupported request: "+ (message == null ? null : (message.getClass().getName() + ": " + message))+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());}Invocation inv = (Invocation) message;// 获取相关的invokerInvoker<?> invoker = getInvoker(channel, inv);// need to consider backward-compatibility if it's a callbackif (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {String methodsStr = invoker.getUrl().getParameters().get("methods");boolean hasMethod = false;if (methodsStr == null || !methodsStr.contains(",")) {hasMethod = inv.getMethodName().equals(methodsStr);} else {String[] methods = methodsStr.split(",");for (String method : methods) {if (inv.getMethodName().equals(method)) {hasMethod = true;break;}}}if (!hasMethod) {logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()+ " not found in callback service interface ,invoke will be ignored."+ " please update the api interface. url is:"+ invoker.getUrl()) + " ,invocation is :" + inv);return null;}}// 获取上下文对象,并设置对端地址RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());// 执行invoker调用链Result result = invoker.invoke(inv);return result.completionFuture().thenApply(Function.identity());}
该方法主要是获取指定服务对应的Invoker实例,并通过Invoker实例的invoker方式调用服务逻辑。
getInvoker方法与获取指定服务对应的Invoker实例,在服务暴露过程中,服务完成暴露后,Invoker实例被转化为Exporter实例,并放到exporterMap容器中,key是serviceKey,也就是由服务信息组成的字符串。格式为 ${serviceGroup}/${serviceName}:${serviceVersion}:${port}/。获取到的Exporter实例中持有目标Invoker实例,然后将调用请求交给Invoker实例处理。
3.1 Invoker的invoker方法
经过上面的步骤,拿到了处理请求的Invoker实例。此时的Invoker实例是ProtocolFilterWrapper$CallbackRegistionInvoker实例,该实例,是在服务暴露过程,构建Invoker调用链时,会加载Provider端相关的Filter类,从而形成的一个调用链。
在进入到具体的Invoker处理逻辑之前,会经过图中8个Filter的处理。
EchoFilter: 判断方法名是否是
$echo, 且方法只有一个参数,则会返回一个默认的结果给消费者端。ClassLoaderFilter: 当前线程设置上下文的ClassLoader。
GeneicFilter: 如果方法名为
$invoke或者$invokeAsync,则自行泛化调用的流程。ContextFilter: 设置RpcContext的上下文参数。
TraceFilter: 调用过程追踪。
TimeoutFilter: 设置timeout_filter_start_time属性值为当前时间,用于判断超时。
MonitorFilter: 上报调用记录到Monitor中心。
ExceptionFitler: 捕捉异常,对异常进行封装。
经过上述8个Filter的处理,请求被转发到了DelegateProviderMetaDataInvoker中,DelegateProviderMetaDataInvoker持有进行实际逻辑处理的Invoker实例,以及ServiceConfig实例。所以DelegateProviderMetaDataInvoker的invoker方法,会交由它持有的Invoker实例进行处理,该Invoker实例,是通过ProxyFactory的getInvoker方法生成的Invoker实例。
在服务暴露分析中,我们知道ProxyFactory扩展点有两种实现,基于jdk和javassit的。默认为javassit。基于javassist的JavassistProxyFactory的getInvoker方法实现如下:
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {/*** JavassitProxyFactory作为ProxyFactory的默认自适应实现,getInvoker方法通过创建Wrapper子类,对proxy类进行包装,返回AbstractProxyInvoker对象。* 在wrapper子类中实现invokerMethod方法,方法体内会为每个proxy的方法做方法名和方法参数匹配校验,匹配成共则进行调用。相比JdkProxyFactory实现,省去了反射调用的开销。*/// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);return new AbstractProxyInvoker<T>(proxy, type, url) {@Overrideprotected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments) throws Throwable {return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);}};}
首先会生成proxy的包装类Wrapper,AbstractProxyInvoker的doInvoker方法会交由wrapper类的invokerMethod方法完成调用。下面看看生成的wrapper的内容是怎样的,从而了解到是如何调用到实际的本地接口实现。
public class Wrapper1 extends Wrapper implements ClassGenerator.DC {public static String[] pns;public static Map pts;public static String[] mns;public static String[] dmns;public static Class[] mts0;@Overridepublic String[] getPropertyNames() {return pns;}@Overridepublic boolean hasProperty(String string) {return pts.containsKey(string);}public Class getPropertyType(String string) {return (Class)pts.get(string);}@Overridepublic String[] getMethodNames() {return mns;}@Overridepublic String[] getDeclaredMethodNames() {return dmns;}@Overridepublic void setPropertyValue(Object object, String string, Object object2) {try {DemoServiceImpl demoServiceImpl = (DemoServiceImpl)object;}catch (Throwable throwable) {throw new IllegalArgumentException(throwable);}throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString());}@Overridepublic Object getPropertyValue(Object object, String string) {try {DemoServiceImpl demoServiceImpl = (DemoServiceImpl)object;}catch (Throwable throwable) {throw new IllegalArgumentException(throwable);}throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString());}public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {DemoServiceImpl demoServiceImpl;try {demoServiceImpl = (DemoServiceImpl)object;}catch (Throwable throwable) {throw new IllegalArgumentException(throwable);}try {if ("sayHello".equals(string) && arrclass.length == 1) {return demoServiceImpl.sayHello((String)arrobject[0]);}}catch (Throwable throwable) {throw new InvocationTargetException(throwable);}throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString());}}
通过反编译,可以看出生成wrapper类如上所示。主要看它的invokerMethod方法,可以看出,该方法会将object类转换为DemoServiceImpl,也就是接口的实现类。然后会判断当前调用的方法,挨个匹配DemoServiceImple的方法实现,如果匹配上了,就执行DemoServerImpl的方法,也就是转为本地方法调用实现。至此,一个dubbo调用的处理就完成了。
3.2 provider端处理请求小结
下面用一张图来总结provider端处理请求的过程。
