dubbo学习-发起请求和处理请求源码剖析
1. 服务发起调用过程源码剖析
在服务引用中我们讲到,最后会生成代理类,代理类中使用了InvokerInvocationHandler拦截目标类的请求。所以在consumer端调用provider端服务的某个接口时,请求会被InvokerInvocationHandler拦截,并执行内部调用过程。
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
/**
* 省略无关代码
*/
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
InvokerInvocationHandler的invoker方法内部逻辑会把请求转发给invoker实例,在转发请求之前,会根据请求的方法名和参数构建RpcInvocation实例。
在创建InvokerInvocationHandler的实例时,会把服务引用过程创建的Invoker实例传入InvokerInvocationHandler。服务引用返回的Invoker实例是MockClusterInvoker,内部持有默认的集群实现的FailoverClusterInvoker实例。所以会先执行MockClusterInvoker的invoker方法,方法内部内部逻辑主要是根据配置的mock参数,来判断是否需要执行mock调用,以及mock调用的策略是什么。默认是不需要进行mock调用,也就是说直接将请求转发给MockClusterInvoker实例内部持有的FailoverClusterInvoker实例的invoke方法处理。
1.1 Invoker实例的invoker方法执行
FailoverClusterInvoker继承自AbstractClusterInvoker,AbstractClusterInvoker对invoker方法做了实现。
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed(); // ①
// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
List<Invoker<T>> invokers = list(invocation); // ②
LoadBalance loadbalance = initLoadBalance(invokers, invocation); // ③
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance); // ④
}
①: 首先检查consumer端是否被销毁,如果被销毁则会抛出异常,告知consumer已经销毁,不能再发起调用。
②: 从Directory实例中获取可调用的Invoker列表。
③: 初始化负载均衡实例,会通过配置loadbalance的值,使用spi机制,创建Loadbalance实例。默认为RandomLoadbalance。
④: 调用doInvoker方法,执行调用过程。
doInvoker方法是一个抽象方法,由子类实现,这里的实现类是FailoverClusterInvoker。
1.1.1 FailoverClusterInvoker的doInvoker方法实现
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
// 检查可选invoker列表是否有invoker元素
checkInvokers(copyInvokers, invocation); // ①
String methodName = RpcUtils.getMethodName(invocation);
// 获取配置的retries值,默认为2。也就是说如果调用发生错误,最多会重试2次,执行调用3次。
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; // ②
if (len <= 0) {
len = 1;
}
// retry loop.
// 记录调用是否发生异常
RpcException le = null;
// 记录已经调用了的Invoker实例,如果失败了,在重试的时候,就不会选择
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size());
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
// 发生了重试
if (i > 0) { // ③
// 判断consumer端是否被销毁
checkWhetherDestroyed();
// 重新获取invoker列表。因为有可能某个provider宕机了,invoker列表会被更新,这时候重新获取就不会获取到宕机的provider
copyInvokers = list(invocation);
// 重新检测invoker列表是否有invoker元素。
checkInvokers(copyInvokers, invocation);
}
// 根据负载均衡策略选择一个invoker实例
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); // ④
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
// 执行方法调用
Result result = invoker.invoke(invocation); // ⑤
if (le != null && logger.isWarnEnabled()) {
// 省略异常代码
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
// 省略异常代码
}
①: 首先会检查Invoker列表,判断列表是否存在Invoker元素。
②: 获取配置的retries的值,默认为2。也就是说如果调用发生错误,会进行重试,最多重试2次,进行3次调用。
④: 根据负载均衡策略选择一个Invoker实例。默认为RandomLoadbalance,也就是进行随机获取。
⑤: 使用选择的Invoker实例,执行方法调用。
在进行服务引用过程中, ProtocolFilterWrapper实例会创建调用链,对执行实际调用的Invoker进行封装,构建的调用链如图:
在进入到真正的AsyncToSyncInvoker的invoker方法之前, 会先经过前面三个Filter的invoker方法。
ConsumerContextFilter: 顾名思义,主要是对consumer的上下文context内容进行赋值。在完成调用之后,会清除掉上下文信息。
FutureFitler: 针对方法配置的oninvoker、onreturn、onthrow等的处理封装。
MonitorFilter: 把调用信息记录,统计,然后上报到Monitor中心。
经过上面调用链中的Filter处理之后,调用会进入到AsyncToSyncInvoker中的invoker方法。AsyncToSyncInvoker对象持有DubboInvoker的引用,它的invoker方法,会调用DubboInvoker的invoker方法获取结果。下图可以看出,此时AsyncToSyncInvoker的invoker属性的实例是DubboInvoker实例。
public Result invoke(Invocation invocation) throws RpcException {
// 调用DubboInvoker的invoker方法获取结果
Result asyncResult = invoker.invoke(invocation);
try {
// 如果调用方式时同步,那么就会一直等待结果的返回。因为等待的时间是Integer.MAX_VALUE
if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) { // 异常的处理
throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof TimeoutException) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} else if (t instanceof RemotingException) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
} catch (Throwable e) {
throw new RpcException(e.getMessage(), e);
}
return asyncResult;
}
AsyncToSyncInvoker的invoker方法逻辑,首先会将请求转发到DubboInvoker的invoker方法, 返回一个Result结果;然后会判断调用方法的请求方式,如果是同步请求,也就是需要等待结果返回,那么就会等待请求结果的返回;如果出现异常,那么在这一层会对异常信息进行封装。
1.1.2 DubboInvoker的invoker方法实现
DubboInvoker继承了AbstractInvoker,AbstractInvoker对invoker方法做了实现,所以DubboInvoker处理请求之前,会先经过父类的invoker方法处理。
public Result invoke(Invocation inv) throws RpcException {
// 判断当前Invoker是否可用,如果不可用,打印warn日志。但是请求还是会继续
if (destroyed.get()) {
logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, " + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
}
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(this);
if (CollectionUtils.isNotEmptyMap(attachment)) {
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
invocation.addAttachments(contextAttachments);
}
// 设置方法请求模式。Future、Sync、async等
invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try {
// 调用doInvoker方法,抽象方法,有子类实现
return doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
// 省略异常处理代码
}
}
该方法主要是添加调用上下文信息到RpcInvocation的attachment量中,并且设置了方法调用的模式。然后调用doInvoker方法进行后的调用处理,doInvoker方法是一个抽象方法,由子类实现。下面继续看DubboInvoker的doInvoker方法。
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
// 设置path和version放入到attachment中
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
// 获取客户端
ExchangeClient currentClient; // ①
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 获取配置的timeout值
int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
// 如果是onyWay的方法,请求发送之后,直接返回一个默认的结果
if (isOneway) { // ②
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
// 非onyWay的方式,使用AsyncRpcResult包装返回结果
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); // ③
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout); // ④
// 订阅响应结果,当请求响应返回后,会将结果放入到asyncRpcResult中
asyncRpcResult.subscribeTo(responseFuture); // ⑤
FutureContext.getContext().setCompatibleFuture(responseFuture);
return asyncRpcResult;
}
} catch (TimeoutException e) {
// 省略异常处理代码
}
}
①: 获取负责和服务端通信的client,clients是一个ExchangeClient实例数组,在DubboInvoker进行refer时会根据配置创建共享客户端还是一个服务一个客户端。
②: 判断服务调用方式,如果是oneWay的方式,则通过ExchangeClient的send方法发送请求到服务端。oneWay的调用表示,一次调用,不需要返回,客户端线程请求发出即结束,立即释放线程资源。
③: 如果是非onyWay的调用方式,则通过AsyncRpcResult封装返回结果。
④: 调用客户端的request方法发送请求到服务端。
⑤: AsyncRpcResult返回结果订阅responseFuture的结果。在responseFuture获取到结果之后,会将结果放入到AsyncRpcResult实例中。
ExchangeClient的request方法经过一层一层的请求转发,请求最后会达到NettyChannel的send方法。
public void send(Object message, boolean sent) throws RemotingException {
// 首先会检查channel是否关闭了
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.writeAndFlush(message); // 使用NioSocketChannel将请求刷出缓存区
if (sent) {
// wait timeout ms
timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); // 获取配置的timeout参数值
success = future.await(timeout); // 等待指定时间,如果超时,则抛出异常
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
这个方法完成的工作就是将客户端的请求通过NioSocketChannel发送到服务端,获取到返回的ChannelFuture实例,然后会等待配置的timeout时间,如果在规定的时间内,服务端还未将结果返回,则会抛出异常。
1.2 服务发送调用过程总结
下面用一张图来总结下发起调用的过程。
2. 服务接收处理调用过程源码剖析
consumer端完成了调用请求的发送,provider端接收到consumer端的请求,开始进行请求处理的逻辑。在分析服务暴露的过程中,我们知道provider会开启NettyServer进行服务监听,在开启NettyServer服务监听的过程中,会绑定NettyServerHandler对来处理客户端的请求。也就是说客户端的请求是从NettyServerHandler开始进入到服务端的。
2.1 ChannelHandler间的请求处理转发
回顾服务暴露分析中,在开启NettyServer的过程里,会构建ChannelHandler链,请求的处理转发也会在ChannelHandler链中进行。
2.1.1 NettyServerHandler的channelActive方法
首先接收请求的是NettyServerHandler,入口方法为channelActive。
public void channelActive(ChannelHandlerContext ctx) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
if (channel != null) {
channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
}
// 调用NettyServer的connected方法,将请求转发给NettyServer
handler.connected(channel);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
该方法接收到请求之后,会将请求转发给NettyServer处理。NettySever继承了AbstractServer,AbstractServer实现了connected方法。
2.1.2 AbstractServer的connected方法
public void connected(Channel ch) throws RemotingException {
// 如果NettyServer已经进入到关闭程序,则会拒绝所有的新请求。
if (this.isClosing() || this.isClosed()) { // ①
logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
ch.close();
return;
}
Collection<Channel> channels = getChannels();
// 判断允许最大连接数是否小于当前channel个数
if (accepts > 0 && channels.size() > accepts) { // ②
logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
ch.close();
return;
}
// 将请求转发给父类处理
super.connected(ch); // ③
}
①: 首先会判断服务端是否已经进入了关闭状态,如果是,则会拒绝所有的新请求。打印warn日志,然后关闭处理请求的channel。
②: 判断允许最大的连接数是否小于当前正在处理请求的channel个数。如果是,打印error日志,然后关闭处理请求的channle。
③: 将请求转发给父类处理。
AbstractServer继承自AbstractPeer,AbstractPeer的connected方法很简单,首先判断NettyServer是否已经关闭,如果已经关闭,则直接返回,反之,将请求转发给它持有的ChannelHandler实例处理。根据上面的ChannelHandler处理链,AbstractPeer将请求转发给MultiMessageHandler处理,MultiMessageHandler继承了AbstractChannelHandlerDelegate,且都没有重写connected方法,所有它的connected方法由父类实现。而AbstractChannelHandlerDelegate的connected方法没有做任何的逻辑处理,直接将请求转发给了下一个ChannelHandler处理。MultiMessageHandler的下一个ChannelHandler是HeartbeatHandler,HeartbeatHandler的connected方法记录了readTimestamp和writeTimestamp两个值,然后将请求转发给下一个ChannelHandler——AllChannelHandler。
2.1.3 AllChannelHandler的connected方法
public void connected(Channel channel) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
该方法使用线程池提交了一个ChannelEventRunnable任务,见名知意,ChannelEventRunnable肯定是实现了Runnable接口,所有具体处理请求的逻辑看ChannelEventRunnable的run方法。
2.1.4 ChannelEventRunnable的run方法
AllChannelHandler将请求转化为ChannelEventRunnable,然后放到了线程池中去执行,主要看它的run方法实现。
public void run() {
if (state == ChannelState.RECEIVED) {
try {
handler.received(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
} else {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case SENT:
try {
handler.sent(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
break;
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is: " + message + ", exception is " + exception, e);
}
break;
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
}
该方法通过判断ChannelState的值,不同的值会调用不同的方法进行处理。现在我们分析的是方法调用请求,所以主要看ChannelState.RECEIVED的值处理逻辑。当state值是ChannelState.RECEIVED时,会将请求转发到下一个ChannelHandler——DecodeHandler。
2.1.5 DecodeHandler的received方法
DecodeHandler主要是进行解码的操作,所以它接收到请求之后,就是把接收到的消息进行解码。
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
// 如果是请求消息,则解码它的数据部分
if (message instanceof Request) {
decode(((Request) message).getData());
}
// 如果是响应消息,则解码它的结果部分
if (message instanceof Response) {
decode(((Response) message).getResult());
}
handler.received(channel, message);
}
DecodeHandler完成对请求消息的解码之后,会将请求转发到下一个ChannelHandler——HeaderExchangeHandler。
2.1.6 HeaderExchangeHandler的received方法
HeaderExchangeHandler的received方法,针对不同的场景做了不同的处理。比如请求事件,请求事件有可分为只读、需要返回值或者不需要返回值。同时也支持Telnet的调用方式。
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); // ①
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
// 请求事件
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request); // ②
} else {
// 需要返回值的请求
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request); // ③
} else {
// 不需要返回值的请求
handler.received(exchangeChannel, request.getData()); // ④
}
}
} else if (message instanceof Response) {
// 处理响应
handleResponse(channel, (Response) message); // ⑤
} else if (message instanceof String) {
// 判断是否支持Telnet调用
if (isClientSide(channel)) { // ⑥
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
// 执行Telnet调用,并返回
String echo = handler.telnet(channel, (String) message); // ⑦
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
①: 负责响应读取时间并更新时间戳。在dubbo心跳处理中会使用当前值并判断是否超过空闲时间。
②: 处理只读事件,用于dubbo优雅停机。当注册中心返注册元数据时,因为网络原因,客户端不能及时感应注册中心中心事件,服务端会发送readonly报文告知下线。handlerEvent方法的逻辑,会在处理事件的Channel中设置一个属性名为channel.readonly的值为true。
③: 处理双向通信的请求。需要等待获取请求结果。
④: 如果是单向通信的请求。不需要等待获取请求结果。
⑤: 处理响应对象,服务消费方会执行此处逻辑。
⑥: 判断是否支持telnet的调用,如果不支持,则打印error日志。
⑦: 支持Telnet调用逻辑。
第③和④步中,调用请求会被转发到DubboProtocol的内部requestHandler去处理,对应的方法是reply。这个在后面分析,先来看看第⑤步的响应处理。
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
该方法通过调用DefaultFuture的received方法完成响应结果的设置,我们继续跟下去,会发现最后会通过调用doReceived方法完成结果的设置。
private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
// 如果调用正常完成,设置调用结果
if (res.getStatus() == Response.OK) {
this.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
// 如果超时了,则会设置超时异常
this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else {
// 设置具体发生的异常信息
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}
}
2.1.6.1 handleRequest方法实现
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) { // 检测请求是否合法,不合法则返回状态码为BAD_REQUEST
Object data = req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
channel.send(res);
return;
}
// find handler by message class.
Object msg = req.getData();
try {
// 调用DubboProtocol的reply方法
CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
try {
if (t == null) { // 没有发生异常,设置结果
res.setStatus(Response.OK);
res.setResult(appResult);
} else { // 设置异常结果
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res); // 将结果发送到channel中
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
} finally {
// HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
});
} catch (Throwable e) {
// 发送异常结果
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
该方法首先会检测调用是否合法,不合法则返回状态码为BAD_REQUEST。然后将请求转发给下一个ChannelHandler——DubboProtocol的requestHandler。最后会把返回结果封装到Response对象,返回给消费者。
3. DubboProtocol处理调用请求
经过上了上述的ChannelHandler的处理,最后请求会进入到DubboProtocol内部进行处理,处理请求的Handler是requestHandler属性,requestHandler是ExchangeHandlerAdapter实例,ExchangeHandlerAdapter通过实现了ChannelHandler接口。主要处理请求的逻辑方法是reply方法。
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
// 获取相关的invoker
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
// 获取上下文对象,并设置对端地址
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 执行invoker调用链
Result result = invoker.invoke(inv);
return result.completionFuture().thenApply(Function.identity());
}
该方法主要是获取指定服务对应的Invoker实例,并通过Invoker实例的invoker方式调用服务逻辑。
getInvoker方法与获取指定服务对应的Invoker实例,在服务暴露过程中,服务完成暴露后,Invoker实例被转化为Exporter实例,并放到exporterMap容器中,key是serviceKey,也就是由服务信息组成的字符串。格式为 ${serviceGroup}/${serviceName}:${serviceVersion}:${port}/
。获取到的Exporter实例中持有目标Invoker实例,然后将调用请求交给Invoker实例处理。
3.1 Invoker的invoker方法
经过上面的步骤,拿到了处理请求的Invoker实例。此时的Invoker实例是ProtocolFilterWrapper$CallbackRegistionInvoker实例,该实例,是在服务暴露过程,构建Invoker调用链时,会加载Provider端相关的Filter类,从而形成的一个调用链。
在进入到具体的Invoker处理逻辑之前,会经过图中8个Filter的处理。
EchoFilter: 判断方法名是否是
$echo
, 且方法只有一个参数,则会返回一个默认的结果给消费者端。ClassLoaderFilter: 当前线程设置上下文的ClassLoader。
GeneicFilter: 如果方法名为
$invoke
或者$invokeAsync
,则自行泛化调用的流程。ContextFilter: 设置RpcContext的上下文参数。
TraceFilter: 调用过程追踪。
TimeoutFilter: 设置timeout_filter_start_time属性值为当前时间,用于判断超时。
MonitorFilter: 上报调用记录到Monitor中心。
ExceptionFitler: 捕捉异常,对异常进行封装。
经过上述8个Filter的处理,请求被转发到了DelegateProviderMetaDataInvoker中,DelegateProviderMetaDataInvoker持有进行实际逻辑处理的Invoker实例,以及ServiceConfig实例。所以DelegateProviderMetaDataInvoker的invoker方法,会交由它持有的Invoker实例进行处理,该Invoker实例,是通过ProxyFactory的getInvoker方法生成的Invoker实例。
在服务暴露分析中,我们知道ProxyFactory扩展点有两种实现,基于jdk和javassit的。默认为javassit。基于javassist的JavassistProxyFactory的getInvoker方法实现如下:
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
/**
* JavassitProxyFactory作为ProxyFactory的默认自适应实现,getInvoker方法通过创建Wrapper子类,对proxy类进行包装,返回AbstractProxyInvoker对象。
* 在wrapper子类中实现invokerMethod方法,方法体内会为每个proxy的方法做方法名和方法参数匹配校验,匹配成共则进行调用。相比JdkProxyFactory实现,省去了反射调用的开销。
*/
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
首先会生成proxy的包装类Wrapper,AbstractProxyInvoker的doInvoker方法会交由wrapper类的invokerMethod方法完成调用。下面看看生成的wrapper的内容是怎样的,从而了解到是如何调用到实际的本地接口实现。
public class Wrapper1 extends Wrapper implements ClassGenerator.DC {
public static String[] pns;
public static Map pts;
public static String[] mns;
public static String[] dmns;
public static Class[] mts0;
@Override
public String[] getPropertyNames() {
return pns;
}
@Override
public boolean hasProperty(String string) {
return pts.containsKey(string);
}
public Class getPropertyType(String string) {
return (Class)pts.get(string);
}
@Override
public String[] getMethodNames() {
return mns;
}
@Override
public String[] getDeclaredMethodNames() {
return dmns;
}
@Override
public void setPropertyValue(Object object, String string, Object object2) {
try {
DemoServiceImpl demoServiceImpl = (DemoServiceImpl)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString());
}
@Override
public Object getPropertyValue(Object object, String string) {
try {
DemoServiceImpl demoServiceImpl = (DemoServiceImpl)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString());
}
public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
DemoServiceImpl demoServiceImpl;
try {
demoServiceImpl = (DemoServiceImpl)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
try {
if ("sayHello".equals(string) && arrclass.length == 1) {
return demoServiceImpl.sayHello((String)arrobject[0]);
}
}
catch (Throwable throwable) {
throw new InvocationTargetException(throwable);
}
throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString());
}
}
通过反编译,可以看出生成wrapper类如上所示。主要看它的invokerMethod方法,可以看出,该方法会将object类转换为DemoServiceImpl,也就是接口的实现类。然后会判断当前调用的方法,挨个匹配DemoServiceImple的方法实现,如果匹配上了,就执行DemoServerImpl的方法,也就是转为本地方法调用实现。至此,一个dubbo调用的处理就完成了。
3.2 provider端处理请求小结
下面用一张图来总结provider端处理请求的过程。