vlambda博客
学习文章列表

dubbo学习-发起请求和处理请求源码剖析

1. 服务发起调用过程源码剖析

在服务引用中我们讲到,最后会生成代理类,代理类中使用了InvokerInvocationHandler拦截目标类的请求。所以在consumer端调用provider端服务的某个接口时,请求会被InvokerInvocationHandler拦截,并执行内部调用过程。

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); /** * 省略无关代码 */ return invoker.invoke(new RpcInvocation(method, args)).recreate();}

InvokerInvocationHandler的invoker方法内部逻辑会把请求转发给invoker实例,在转发请求之前,会根据请求的方法名和参数构建RpcInvocation实例。

在创建InvokerInvocationHandler的实例时,会把服务引用过程创建的Invoker实例传入InvokerInvocationHandler。服务引用返回的Invoker实例是MockClusterInvoker,内部持有默认的集群实现的FailoverClusterInvoker实例。所以会先执行MockClusterInvoker的invoker方法,方法内部内部逻辑主要是根据配置的mock参数,来判断是否需要执行mock调用,以及mock调用的策略是什么。默认是不需要进行mock调用,也就是说直接将请求转发给MockClusterInvoker实例内部持有的FailoverClusterInvoker实例的invoke方法处理。

1.1 Invoker实例的invoker方法执行

FailoverClusterInvoker继承自AbstractClusterInvoker,AbstractClusterInvoker对invoker方法做了实现。

public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); // ①
// binding attachments into invocation. Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); }
List<Invoker<T>> invokers = list(invocation); // ② LoadBalance loadbalance = initLoadBalance(invokers, invocation); // ③ RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); // ④}


①: 首先检查consumer端是否被销毁,如果被销毁则会抛出异常,告知consumer已经销毁,不能再发起调用。

②: 从Directory实例中获取可调用的Invoker列表。

③: 初始化负载均衡实例,会通过配置loadbalance的值,使用spi机制,创建Loadbalance实例。默认为RandomLoadbalance。

④: 调用doInvoker方法,执行调用过程。

doInvoker方法是一个抽象方法,由子类实现,这里的实现类是FailoverClusterInvoker。

1.1.1 FailoverClusterInvoker的doInvoker方法实现

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyInvokers = invokers; // 检查可选invoker列表是否有invoker元素 checkInvokers(copyInvokers, invocation); // ① String methodName = RpcUtils.getMethodName(invocation); // 获取配置的retries值,默认为2。也就是说如果调用发生错误,最多会重试2次,执行调用3次。 int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; // ② if (len <= 0) { len = 1; } // retry loop. // 记录调用是否发生异常 RpcException le = null; // 记录已经调用了的Invoker实例,如果失败了,在重试的时候,就不会选择 List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { // 发生了重试 if (i > 0) { // ③ // 判断consumer端是否被销毁 checkWhetherDestroyed(); // 重新获取invoker列表。因为有可能某个provider宕机了,invoker列表会被更新,这时候重新获取就不会获取到宕机的provider copyInvokers = list(invocation); // 重新检测invoker列表是否有invoker元素。 checkInvokers(copyInvokers, invocation); } // 根据负载均衡策略选择一个invoker实例 Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); // ④ invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { // 执行方法调用 Result result = invoker.invoke(invocation); // ⑤ if (le != null && logger.isWarnEnabled()) { // 省略异常代码 } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } // 省略异常代码}

①: 首先会检查Invoker列表,判断列表是否存在Invoker元素。

②: 获取配置的retries的值,默认为2。也就是说如果调用发生错误,会进行重试,最多重试2次,进行3次调用。

④: 根据负载均衡策略选择一个Invoker实例。默认为RandomLoadbalance,也就是进行随机获取。

⑤: 使用选择的Invoker实例,执行方法调用。

在进行服务引用过程中, ProtocolFilterWrapper实例会创建调用链,对执行实际调用的Invoker进行封装,构建的调用链如图:

在进入到真正的AsyncToSyncInvoker的invoker方法之前, 会先经过前面三个Filter的invoker方法。

  • ConsumerContextFilter: 顾名思义,主要是对consumer的上下文context内容进行赋值。在完成调用之后,会清除掉上下文信息。

  • FutureFitler: 针对方法配置的oninvoker、onreturn、onthrow等的处理封装。

  • MonitorFilter: 把调用信息记录,统计,然后上报到Monitor中心。

经过上面调用链中的Filter处理之后,调用会进入到AsyncToSyncInvoker中的invoker方法。AsyncToSyncInvoker对象持有DubboInvoker的引用,它的invoker方法,会调用DubboInvoker的invoker方法获取结果。下图可以看出,此时AsyncToSyncInvoker的invoker属性的实例是DubboInvoker实例。

dubbo学习-发起请求和处理请求源码剖析

public Result invoke(Invocation invocation) throws RpcException { // 调用DubboInvoker的invoker方法获取结果 Result asyncResult = invoker.invoke(invocation); try { // 如果调用方式时同步,那么就会一直等待结果的返回。因为等待的时间是Integer.MAX_VALUE if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) { asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { // 异常的处理 throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (ExecutionException e) { Throwable t = e.getCause(); if (t instanceof TimeoutException) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } else if (t instanceof RemotingException) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } catch (Throwable e) { throw new RpcException(e.getMessage(), e); } return asyncResult;}

AsyncToSyncInvoker的invoker方法逻辑,首先会将请求转发到DubboInvoker的invoker方法, 返回一个Result结果;然后会判断调用方法的请求方式,如果是同步请求,也就是需要等待结果回,那么就会等待请求结果的返回;如果出现异常,那么在这一层会对异常信息进行封装。

1.1.2 DubboInvoker的invoker方法实现

DubboInvoker继承了AbstractInvoker,AbstractInvoker对invoker方法做了实现,所以DubboInvoker处理请求之前,会先经过父类的invoker方法处理。

public Result invoke(Invocation inv) throws RpcException { // 判断当前Invoker是否可用,如果不可用,打印warn日志。但是请求还是会继续 if (destroyed.get()) { logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, " + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer"); } RpcInvocation invocation = (RpcInvocation) inv; invocation.setInvoker(this); if (CollectionUtils.isNotEmptyMap(attachment)) { invocation.addAttachmentsIfAbsent(attachment); } Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (CollectionUtils.isNotEmptyMap(contextAttachments)) { invocation.addAttachments(contextAttachments); } // 设置方法请求模式。Future、Sync、async等 invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation)); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try { // 调用doInvoker方法,抽象方法,有子类实现 return doInvoke(invocation); } catch (InvocationTargetException e) { // biz exception // 省略异常处理代码 }}

该方法主要是添加调用上下文信息到RpcInvocation的attachment量中,并且设置了方法调用的模式。然后调用doInvoker方法进行后的调用处理,doInvoker方法是一个抽象方法,由子类实现。下面继续看DubboInvoker的doInvoker方法。

protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); // 设置path和version放入到attachment中 inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version);
// 获取客户端 ExchangeClient currentClient; // ① if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); // 获取配置的timeout值 int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT); // 如果是onyWay的方法,请求发送之后,直接返回一个默认的结果 if (isOneway) { // ② boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { // 非onyWay的方式,使用AsyncRpcResult包装返回结果 AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); // ③ CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout); // ④ // 订阅响应结果,当请求响应返回后,会将结果放入到asyncRpcResult中 asyncRpcResult.subscribeTo(responseFuture); // ⑤ FutureContext.getContext().setCompatibleFuture(responseFuture); return asyncRpcResult; } } catch (TimeoutException e) { // 省略异常处理代码 }}

①: 获取负责和服务端通信的client,clients是一个ExchangeClient实例数组,在DubboInvoker进行refer时会根据配置创建共享客户端还是一个服务一个客户端。

②: 判断服务调用方式,如果是oneWay的方式,则通过ExchangeClient的send方法发送请求到服务端。oneWay的调用表示,一次调用,不需要返回,客户端线程请求发出即结束,立即释放线程资源。

③: 如果是非onyWay的调用方式,则通过AsyncRpcResult封装返回结果。

④: 调用客户端的request方法发送请求到服务端。

⑤: AsyncRpcResult返回结果订阅responseFuture的结果。在responseFuture获取到结果之后,会将结果放入到AsyncRpcResult实例中。

ExchangeClient的request方法经过一层一层的请求转发,请求最后会达到NettyChannel的send方法。

public void send(Object message, boolean sent) throws RemotingException { // 首先会检查channel是否关闭了 super.send(message, sent);
boolean success = true; int timeout = 0; try { ChannelFuture future = channel.writeAndFlush(message); // 使用NioSocketChannel将请求刷出缓存区 if (sent) { // wait timeout ms timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); // 获取配置的timeout参数值 success = future.await(timeout); // 等待指定时间,如果超时,则抛出异常 } Throwable cause = future.cause(); if (cause != null) { throw cause; } } catch (Throwable e) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); }}

这个方法完成的工作就是将客户端的请求通过NioSocketChannel发送到服务端,获取到返回的ChannelFuture实例,然后会等待配置的timeout时间,如果在规定的时间内,服务端还未将结果返回,则会抛出异常。

1.2 服务发送调用过程总结

下面用一张图来总结下发起调用的过程。

dubbo学习-发起请求和处理请求源码剖析

2. 服务接收处理调用过程源码剖析

consumer端完成了调用请求的发送,provider端接收到consumer端的请求,开始进行请求处理的逻辑。在分析服务暴露的过程中,我们知道provider会开启NettyServer进行服务监听,在开启NettyServer服务监听的过程中,会绑定NettyServerHandler对来处理客户端的请求。也就是说客户端的请求是从NettyServerHandler开始进入到服务端的。

2.1 ChannelHandler间的请求处理转发

回顾服务暴露分析中,在开启NettyServer的过程里,会构建ChannelHandler链,请求的处理转发也会在ChannelHandler链中进行。

dubbo学习-发起请求和处理请求源码剖析

2.1.1 NettyServerHandler的channelActive方法

首先接收请求的是NettyServerHandler,入口方法为channelActive。

public void channelActive(ChannelHandlerContext ctx) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { if (channel != null) { channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel); } // 调用NettyServer的connected方法,将请求转发给NettyServer handler.connected(channel); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); }}

该方法接收到请求之后,会将请求转发给NettyServer处理。NettySever继承了AbstractServer,AbstractServer实现了connected方法。

2.1.2 AbstractServer的connected方法

 public void connected(Channel ch) throws RemotingException { // 如果NettyServer已经进入到关闭程序,则会拒绝所有的新请求。 if (this.isClosing() || this.isClosed()) { // ① logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process."); ch.close(); return; }
Collection<Channel> channels = getChannels(); // 判断允许最大连接数是否小于当前channel个数 if (accepts > 0 && channels.size() > accepts) { // ② logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts); ch.close(); return; } // 将请求转发给父类处理 super.connected(ch); // ③}

①: 首先会判断服务端是否已经进入了关闭状态,如果是,则会拒绝所有的新请求。打印warn日志,然后关闭处理请求的channel。

②: 判断允许最大的连接数是否小于当前正在处理请求的channel个数。如果是,打印error日志,然后关闭处理请求的channle。

③: 将请求转发给父类处理。

AbstractServer继承自AbstractPeer,AbstractPeer的connected方法很简单,首先判断NettyServer是否已经关闭,如果已经关闭,则直接返回,反之,将请求转发给它持有的ChannelHandler实例处理。根据上面的ChannelHandler处理链,AbstractPeer将请求转发给MultiMessageHandler处理,MultiMessageHandler继承了AbstractChannelHandlerDelegate,且都没有重写connected方法,所有它的connected方法由父类实现。而AbstractChannelHandlerDelegate的connected方法没有做任何的逻辑处理,直接将请求转发给了下一个ChannelHandler处理。MultiMessageHandler的下一个ChannelHandler是HeartbeatHandler,HeartbeatHandler的connected方法记录了readTimestamp和writeTimestamp两个值,然后将请求转发给下一个ChannelHandler——AllChannelHandler。

2.1.3 AllChannelHandler的connected方法

public void connected(Channel channel) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); }}

该方法使用线程池提交了一个ChannelEventRunnable任务,见名知意,ChannelEventRunnable肯定是实现了Runnable接口,所有具体处理请求的逻辑看ChannelEventRunnable的run方法。

2.1.4 ChannelEventRunnable的run方法

AllChannelHandler将请求转化为ChannelEventRunnable,然后放到了线程池中执行,主要看它的run方法实现。

public void run() { if (state == ChannelState.RECEIVED) { try { handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case DISCONNECTED: try { handler.disconnected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case SENT: try { handler.sent(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } break; case CAUGHT: try { handler.caught(channel, exception); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is: " + message + ", exception is " + exception, e); } break; default: logger.warn("unknown state: " + state + ", message is " + message); } }}

该方法通过判断ChannelState的值,不同的值会调用不同的方法进行处理。现在我们分析的是方法调用请求,所以主要看ChannelState.RECEIVED的值处理逻辑。当state值是ChannelState.RECEIVED时,会将请求转发到下一个ChannelHandler——DecodeHandler。

2.1.5 DecodeHandler的received方法

DecodeHandler主要是进行解码的操作,所以它接收到请求之后,就是把接收到的消息进行解码。

public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { decode(message); } // 如果是请求消息,则解码它的数据部分 if (message instanceof Request) { decode(((Request) message).getData()); } // 如果是响应消息,则解码它的结果部分 if (message instanceof Response) { decode(((Response) message).getResult()); }
handler.received(channel, message);}

DecodeHandler完成对请求消息的解码之后,会将请求转发到下一个ChannelHandler——HeaderExchangeHandler。

2.1.6 HeaderExchangeHandler的received方法

HeaderExchangeHandler的received方法,针对不同的场景做了不同的处理。比如请求事件,请求事件有可分为只读、需要返回值或者不需要返回值。同时也支持Telnet的调用方式。

public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); // ① final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { // 请求事件 if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); // ②  } else { // 需要返回值的请求 if (request.isTwoWay()) { handleRequest(exchangeChannel, request); // ③ } else { // 不需要返回值的请求 handler.received(exchangeChannel, request.getData()); // ④ } } } else if (message instanceof Response) { // 处理响应 handleResponse(channel, (Response) message); // ⑤ } else if (message instanceof String) { // 判断是否支持Telnet调用 if (isClientSide(channel)) { // ⑥ Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { // 执行Telnet调用,并返回 String echo = handler.telnet(channel, (String) message); // ⑦ if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); }}

①: 负责响应读取时间并更新时间戳。在dubbo心跳处理中会使用当前值并判断是否超过空闲时间。

②: 处理只读事件,用于dubbo优雅停机。当注册中心返注册元数据时,因为网络原因,客户端不能及时感应注册中心中心事件,服务端会发送readonly报文告知下线。handlerEvent方法的逻辑,会在处理事件的Channel中设置一个属性名为channel.readonly的值为true。

③: 处理双向通信的请求。需要等待获取请求结果。

④: 如果是单向通信的请求。不需要等待获取请求结果。

⑤: 处理响应对象,服务消费方会执行此处逻辑。

⑥: 判断是否支持telnet的调用,如果不支持,则打印error日志。

⑦: 支持Telnet调用逻辑。

第③和④步中,调用请求会被转发到DubboProtocol的内部requestHandler去处理,对应的方法是reply。这个在后面分析,先来看看第⑤步的响应处理。

static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); }}

该方法通过调用DefaultFuture的received方法完成响应结果的设置,我们继续跟下去,会发现最后会通过调用doReceived方法完成结果的设置。

private void doReceived(Response res) { if (res == null) { throw new IllegalStateException("response cannot be null"); } // 如果调用正常完成,设置调用结果 if (res.getStatus() == Response.OK) { this.complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { // 如果超时了,则会设置超时异常 this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { // 设置具体发生的异常信息  this.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); }}
2.1.6.1 handleRequest方法实现
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { // 检测请求是否合法,不合法则返回状态码为BAD_REQUEST Object data = req.getData();
String msg; if (data == null) { msg = null; } else if (data instanceof Throwable) { msg = StringUtils.toString((Throwable) data); } else { msg = data.toString(); } res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST);
channel.send(res); return; } // find handler by message class. Object msg = req.getData(); try { // 调用DubboProtocol的reply方法 CompletionStage<Object> future = handler.reply(channel, msg); future.whenComplete((appResult, t) -> { try { if (t == null) { // 没有发生异常,设置结果 res.setStatus(Response.OK); res.setResult(appResult); } else { // 设置异常结果 res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } channel.send(res); // 将结果发送到channel中 } catch (RemotingException e) { logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e); } finally { // HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }); } catch (Throwable e) { // 发送异常结果 res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); }}

该方法首先会检测调用是否合法,不合法则返回状态码为BAD_REQUEST。然后将请求转发给下一个ChannelHandler——DubboProtocol的requestHandler。最后会把返回结果封装到Response对象,返回给消费者。

3. DubboProtocol处理调用请求

经过上了上述的ChannelHandler的处理,最后请求会进入到DubboProtocol内部进行处理,处理请求的Handler是requestHandler属性,requestHandler是ExchangeHandlerAdapter实例,ExchangeHandlerAdapter通过实现了ChannelHandler接口。主要处理请求的逻辑方法是reply方法。

public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) { throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); }
Invocation inv = (Invocation) message; // 获取相关的invoker Invoker<?> invoker = getInvoker(channel, inv); // need to consider backward-compatibility if it's a callback if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || !methodsStr.contains(",")) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } // 获取上下文对象,并设置对端地址 RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); // 执行invoker调用链 Result result = invoker.invoke(inv); return result.completionFuture().thenApply(Function.identity());}

该方法主要是获取指定服务对应的Invoker实例,并通过Invoker实例的invoker方式调用服务逻辑。

getInvoker方法与获取指定服务对应的Invoker实例,在服务暴露过程中,服务完成暴露后,Invoker实例被转化为Exporter实例,并放到exporterMap容器中,key是serviceKey,也就是由服务信息组成的字符串。格式为 ${serviceGroup}/${serviceName}:${serviceVersion}:${port}/。获取到的Exporter实例中持有目标Invoker实例,然后将调用请求交给Invoker实例处理。

3.1 Invoker的invoker方法

经过上面的步骤,拿到了处理请求的Invoker实例。此时的Invoker实例是ProtocolFilterWrapper$CallbackRegistionInvoker实例,该实例,是在服务暴露过程,构建Invoker调用链时,会加载Provider端相关的Filter类,从而形成的一个调用链。

在进入到具体的Invoker处理逻辑之前,会经过图中8个Filter的处理。

  • EchoFilter: 判断方法名是否是$echo, 且方法只有一个参数,则会返回一个默认的结果给消费者端。

  • ClassLoaderFilter: 当前线程设置上下文的ClassLoader。

  • GeneicFilter:  如果方法名为$invoke或者$invokeAsync,则自行泛化调用的流程。

  • ContextFilter: 设置RpcContext的上下文参数。

  • TraceFilter: 调用过程追踪。

  • TimeoutFilter: 设置timeout_filter_start_time属性值为当前时间,用于判断超时。

  • MonitorFilter: 上报调用记录到Monitor中心。

  • ExceptionFitler: 捕捉异常,对异常进行封装。

经过上述8个Filter的处理,请求被转发到了DelegateProviderMetaDataInvoker中,DelegateProviderMetaDataInvoker持有进行实际逻辑处理的Invoker实例,以及ServiceConfig实例。所以DelegateProviderMetaDataInvoker的invoker方法,会交由它持有的Invoker实例进行处理,该Invoker实例,是通过ProxyFactory的getInvoker方法生成的Invoker实例。

在服务暴露分析中,我们知道ProxyFactory扩展点有两种实现,基于jdk和javassit的。默认为javassit。基于javassist的JavassistProxyFactory的getInvoker方法实现如下:

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { /** * JavassitProxyFactory作为ProxyFactory的默认自适应实现,getInvoker方法通过创建Wrapper子类,对proxy类进行包装,返回AbstractProxyInvoker对象。 * 在wrapper子类中实现invokerMethod方法,方法体内会为每个proxy的方法做方法名和方法参数匹配校验,匹配成共则进行调用。相比JdkProxyFactory实现,省去了反射调用的开销。 */
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } };}

首先会生成proxy的包装类Wrapper,AbstractProxyInvoker的doInvoker方法会交由wrapper类的invokerMethod方法完成调用。下面看看生成的wrapper的内容是怎样的,从而了解到是如何调用到实际的本地接口实现。

public class Wrapper1 extends Wrapper implements ClassGenerator.DC { public static String[] pns; public static Map pts; public static String[] mns; public static String[] dmns; public static Class[] mts0;
@Override public String[] getPropertyNames() { return pns; }
@Override public boolean hasProperty(String string) { return pts.containsKey(string); }
public Class getPropertyType(String string) { return (Class)pts.get(string); }
@Override public String[] getMethodNames() { return mns; }
@Override public String[] getDeclaredMethodNames() { return dmns; }
@Override public void setPropertyValue(Object object, String string, Object object2) { try { DemoServiceImpl demoServiceImpl = (DemoServiceImpl)object; } catch (Throwable throwable) { throw new IllegalArgumentException(throwable); } throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString()); }
@Override public Object getPropertyValue(Object object, String string) { try { DemoServiceImpl demoServiceImpl = (DemoServiceImpl)object; } catch (Throwable throwable) { throw new IllegalArgumentException(throwable); } throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString()); }
public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException { DemoServiceImpl demoServiceImpl; try { demoServiceImpl = (DemoServiceImpl)object; } catch (Throwable throwable) { throw new IllegalArgumentException(throwable); } try { if ("sayHello".equals(string) && arrclass.length == 1) { return demoServiceImpl.sayHello((String)arrobject[0]); } } catch (Throwable throwable) { throw new InvocationTargetException(throwable); } throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString()); }}

通过反编译,可以看出生成wrapper类如上所示。主要看它的invokerMethod方法,可以看出,该方法会将object类转换为DemoServiceImpl,也就是接口的实现类。然后会判断当前调用的方法,挨个匹配DemoServiceImple的方法实现,如果匹配上了,就执行DemoServerImpl的方法,也就是转为本地方法调用实现。至此,一个dubbo调用的处理就完成了。

3.2 provider端处理请求小结

下面用一张图来总结provider端处理请求的过程。