vlambda博客
学习文章列表

Dubbo中重要的protocol【协议层】概述

protocol【协议层】概述

  • 协议通常是涉及到多方相互协作为实现某功能而遵守的规则

  • 常见通讯协议 如:网络七层协议【OSI】、HTTP协议、TCP协议、UDP协议

  • 网络七层协议【OSI】:应用层、表示层、会话层、传输层、网络层、数据链路层、物理层;规定了网络通讯顶层大框架,只有接口【很抽象】

  • HTTP协议:大概会想到 应用层协议、URL格式、httpheader、httpbody、get|post...【较具象】

  • TCP协议:大概会想到 传输层协议、三次握手/四次挥手、可靠性协议...【较具象】

  • 不论那种通讯协议都是指定了网络通讯中某些【或全部】环节具体规则

  • dubbo框架有很多通讯协议可供选择,不同通讯协议相当于实现RPC功能的不同路径

    • 将RPC功能类比成:“从A城市到B城市” 这么一件事情

    • 多种协议 可类比成 从A到B 有多种可选择的交通工具

    • 选择一种具体交通工具 就决定了:买什么票、司机是谁、舒适度、交通线路 ...

    • dubbo的任何一种协议也规定【或默认】了 序列化方式、【长|短】链接、底层协议【http|tcp】、同步或异步、消息处理线程模型 ...

dubbo中的Protocol

  • org.apache.dubbo.rpc.Protocol#export; provider 暴露invoker对象

  • 核心功能1: 启动xxxServer,将服务执行对象【invoker】暴露在网络中

  • org.apache.dubbo.rpc.Protocol#refer ; consumer 将URL转换成invoker对象

  • 核心功能2: 创建xxxClient,封装为调用对象【invoker】供consumer调用

  • Protocol 简图

     

    1. // 默认rpc层协议采用:DubboProtocol

    2. @SPI("dubbo")

    3. public interface Protocol {


    4. // provider 暴露服务阶段调用

    5. // invoker:rpc接口实现类[impl]转成invoker对象

    6. // 通过调用org.apache.dubbo.rpc.ProxyFactory#getInvoker进行【impl与invoker】 转换

    7. // provider方法执行链:【.. -> Invoker#invoker -> Wrapper#invokeMethod -> Impl#xxx】

    8. //-------------------invoker 说明 end------------------------

    9. // 例:http协议暴露服务:启动http服务器,将对应接口实现类暴露成http 服务

    10. //     服务暴露URL 例 : http://127.0.0.1:49178/org.apache.dubbo.rpc.protocol.http.HttpService?version=1.0.0

    11. // 例:dubbo协议暴露服务:默认启动netty server进行

    12. @Adaptive

    13. <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;


    14. // consumer 引用服务阶段执行

    15. // 根据接口类型type,与远程服务url生成invoker对象

    16. // consumer方法调用链:rpcInterface#xxx -> proxy#xxx -> invoker#invoker -> nettyClient#send【默认】

    17. @Adaptive

    18. <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;


    19. void destroy();

    20. 。。。

    21. }

  • AbstractProtocol 以及相关子类协议都在dubbo-rpc模块下

  • DubboProtocol、RedisProtocol、MemcacheProtocol 直接继承AbstractProtocol

  • 其他常见dubbo-rpc包下协议需要继承AbstractProxyProtocol

  • RegistryProtocol 服务注册监听阶段功能处理【本文暂不分析】

  • ProtocolFilterWrapper, ProtocolListenerWrapper 协议包装【代理】类


  
    
    
  
  1. public abstract class AbstractProtocol implements Protocol {

  2. // 服务暴露map对象

  3. protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();


  4. @Override

  5. public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {

  6. // 异步转同步invoker

  7. return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));

  8. }


  9. // 子类【服务引用】实现该方法

  10. protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException;

  11. 。。。

  12. }

  • AbstractProtocol 中定义了服务暴露exporterMap属性

  • 定义新服务引用抽象方法 :protocolBindingRefer

dubbo-rpc模块中协议分类

代理协议

  • AbstractProxyProtocol

  • 核心功能:rpc接口代理类与invoker对象之间进行转换【阅读完该类代码后,发现类名称很形象】

  • Protocol定义的export与refer方法对应的参数与返回值 与 AbstractProxyProtocol子类对应 doExport, doRefer 参数与返回值类型不一样;所以需要转换

  • AbstractProxyProtocol 中的 export与refer,会调用子类的doExport 与 doRefer, 并进行 proxy【接口代理类对象】与 invoker对象 之间转换

  • 那么AbstractProxyProtocol 子类 就不能直接接收或返回与Protocol export或refer相同类型的参数或返回值么?或者自己进行转换?答案:不能直接接收或返回相同的参数或返回值【下文可以看到对应子类实现】;子类可以在内部转换,但每个子类都需要转换,所以相同功能可以抽到父类处理

  • 【DubboProtocol 服务暴露与引用方法中 比AbstractProxyProtocol 少了一次转换】


  
    
    
  
  1. public abstract class AbstractProxyProtocol extends AbstractProtocol {


  2. private final List<Class<?>> rpcExceptions = new CopyOnWriteArrayList<Class<?>>();


  3. // 代理工厂

  4. protected ProxyFactory proxyFactory;


  5. public AbstractProxyProtocol() {

  6. }


  7. public AbstractProxyProtocol(Class<?>... exceptions) {

  8. for (Class<?> exception : exceptions) {

  9. addRpcException(exception);

  10. }

  11. }


  12. public ProxyFactory getProxyFactory() {

  13. return proxyFactory;

  14. }


  15. public void setProxyFactory(ProxyFactory proxyFactory) {

  16. this.proxyFactory = proxyFactory;

  17. }


  18. // 对应子类实现该服务暴露方法时 需传入rpc接口实现类impl【或实现代理类】

  19. // 而Protocol接口方法是 :<T> Exporter<T> export(Invoker<T> invoker);

  20. // 所以需要该代理协议 实现 invoker 与 代理实现类impl的转换

  21. protected abstract <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException;


  22. // 对应子类实现该引用服务方法时 返回值类型是 :rpc接口代理类

  23. // 而Protocol接口方法:<T> Invoker<T> refer(Class<T> type, URL url);

  24. // 所以需要该代理协议 实现 invoker 与 rpc接口代理类的转换

  25. protected abstract <T> T doRefer(Class<T> type, URL url) throws RpcException;


  26. @Override

  27. @SuppressWarnings("unchecked")

  28. public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {

  29. final String uri = serviceKey(invoker.getUrl());

  30. Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);

  31. if (exporter != null) {

  32. // When modifying the configuration through override, you need to re-expose the newly modified service.

  33. if (Objects.equals(exporter.getInvoker().getUrl(), invoker.getUrl())) {

  34. return exporter;

  35. }

  36. }

  37. // 服务暴露阶段将invoker对象转换成接口代理类对象进行暴露

  38. // 【DubboProtocol没有这一次转换】

  39. final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl());

  40. exporter = new AbstractExporter<T>(invoker) {

  41. @Override

  42. public void unexport() {

  43. super.unexport();

  44. exporterMap.remove(uri);

  45. if (runnable != null) {

  46. try {

  47. runnable.run();

  48. } catch (Throwable t) {

  49. logger.warn(t.getMessage(), t);

  50. }

  51. }

  52. }

  53. };

  54. exporterMap.put(uri, exporter);

  55. return exporter;

  56. }


  57. @Override

  58. protected <T> Invoker<T> protocolBindingRefer(final Class<T> type, final URL url) throws RpcException {

  59. // 服务引用阶段 多一次invoker 与 接口实现代理类的转换

  60. // 【DubboProtocol没有这一次转换】

  61. final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);

  62. Invoker<T> invoker = new AbstractInvoker<T>(type, url) {

  63. @Override

  64. protected Result doInvoke(Invocation invocation) throws Throwable {

  65. try {

  66. Result result = target.invoke(invocation);

  67. // FIXME result is an AsyncRpcResult instance.

  68. Throwable e = result.getException();

  69. if (e != null) {

  70. for (Class<?> rpcException : rpcExceptions) {

  71. if (rpcException.isAssignableFrom(e.getClass())) {

  72. throw getRpcException(type, url, invocation, e);

  73. }

  74. }

  75. }

  76. return result;

  77. } catch (RpcException e) {

  78. if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {

  79. e.setCode(getErrorCode(e.getCause()));

  80. }

  81. throw e;

  82. } catch (Throwable e) {

  83. throw getRpcException(type, url, invocation, e);

  84. }

  85. }

  86. };

  87. invokers.add(invoker);

  88. return invoker;

  89. }


  90. 。。。

  91. }

功能分类

单链接、长链接,NIO异步传输,TCP

  • DubboProtocol 【非代理协议】

  • 服务暴露会调用Exchangers.bind(url, requestHandler);方法【指定requestHandler】

  • 服务引用会调用Exchangers.connect(url, requestHandler);方法【指定requestHandler】

  • requestHandler:请求处理handler,同时也处理回调请求【服务端回调客户端】

  • dubbo中其他rpc模块下的协议类均没看到回调处理【均没有回调处理】

  • 服务暴露与服务引用阶段不需要创建rpc接口代理对象【与AbstractProxyProtocol子协议代码结构上的一个不同点】

  • Exchangers 内部指定了【netty | mina | grizzly】java nio框架

  • 服务暴露使用使用nettyServer【默认】, 服务引用可以采用minaClient【配置可指定 netty | mina | grizzly 三者之间选择】

  • 默认采用hessian2序列化方式,dubbo自定义数据包结构

  • 默认服务引用方创建一个长链接


  
    
    
  
  1. public class DubboProtocol extends AbstractProtocol {


  2. // 请求处理handler

  3. // 回调请求也在该handler 处理

  4. private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {


  5. @Override

  6. public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {


  7. Invocation inv = (Invocation) message;

  8. Invoker<?> invoker = getInvoker(channel, inv);

  9. // 是否有回调CallBack 处理

  10. // need to consider backward-compatibility if it's a callback

  11. if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {

  12. String methodsStr = invoker.getUrl().getParameters().get("methods");

  13. 。。。

  14. }

  15. }

  16. RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());

  17. Result result = invoker.invoke(inv);

  18. return result.thenApply(Function.identity());

  19. }

  20. 。。。

  21. };


  22. // 直接将invoker对象进行暴露

  23. // 没有生成接口实现代理对象【与AbstractProxyProtocol不同】

  24. // 在openServer(url);方法中指定了requestHander进行处理所有请求

  25. @Override

  26. public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {

  27. URL url = invoker.getUrl();


  28. // export service.

  29. String key = serviceKey(url);

  30. DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);

  31. exporterMap.put(key, exporter);


  32. //export an stub service for dispatching event

  33. Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);

  34. Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);

  35. if (isStubSupportEvent && !isCallbackservice) {

  36. String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);

  37. if (stubServiceMethods == null || stubServiceMethods.length() == 0) {

  38. if (logger.isWarnEnabled()) {

  39. logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +

  40. "], has set stubproxy support event ,but no stub methods founded."));

  41. }


  42. }

  43. }


  44. // 启动server

  45. openServer(url);

  46. optimizeSerialization(url);


  47. return exporter;

  48. }


  49. private void openServer(URL url) {

  50. // find server.

  51. String key = url.getAddress();

  52. //client can export a service which's only for server to invoke

  53. boolean isServer = url.getParameter(IS_SERVER_KEY, true);

  54. if (isServer) {

  55. // serverMap 为AbstractProtocol中属性,

  56. // 可当作服务server缓存

  57. // 并可保证接口暴露幂等性

  58. ProtocolServer server = serverMap.get(key);

  59. if (server == null) {

  60. synchronized (this) {

  61. server = serverMap.get(key);

  62. if (server == null) {

  63. serverMap.put(key, createServer(url));

  64. }

  65. }

  66. } else {

  67. // server supports reset, use together with override

  68. server.reset(url);

  69. }

  70. }

  71. }


  72. private ProtocolServer createServer(URL url) {

  73. url = URLBuilder.from(url)

  74. // send readonly event when server closes, it's enabled by default

  75. .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())

  76. // enable heartbeat by default

  77. .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))

  78. .addParameter(CODEC_KEY, DubboCodec.NAME)

  79. .build();

  80. String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);


  81. if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {

  82. throw new RpcException("Unsupported server type: " + str + ", url: " + url);

  83. }


  84. ExchangeServer server;

  85. try {

  86. // 指定 requestHandler 来接收请求消息处理器

  87. server = Exchangers.bind(url, requestHandler);

  88. } catch (RemotingException e) {

  89. throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);

  90. }


  91. str = url.getParameter(CLIENT_KEY);

  92. if (str != null && str.length() > 0) {

  93. Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();

  94. if (!supportedTypes.contains(str)) {

  95. throw new RpcException("Unsupported client type: " + str);

  96. }

  97. }


  98. return new DubboProtocolServer(server);

  99. }



  100. // 继承AbstractProtocol 服务引用方法

  101. @Override

  102. public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {

  103. optimizeSerialization(url);


  104. // create rpc invoker.

  105. // 创建client

  106. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);

  107. invokers.add(invoker);


  108. return invoker;

  109. }


  110. private ExchangeClient[] getClients(URL url) {

  111. boolean useShareConnect = false;

  112. int connections = url.getParameter(CONNECTIONS_KEY, 0);

  113. List<ReferenceCountExchangeClient> shareClients = null;

  114. // if not configured, connection is shared, otherwise, one connection for one service

  115. if (connections == 0) {

  116. useShareConnect = true;


  117. String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);

  118. connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,

  119. DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);

  120. // 通常引用一个服务只启动一个client【tcp长链接】

  121. shareClients = getSharedClient(url, connections);

  122. }


  123. // 通过connections 参数控制启动多个client【tcp长链接】

  124. ExchangeClient[] clients = new ExchangeClient[connections];

  125. for (int i = 0; i < clients.length; i++) {

  126. if (useShareConnect) {

  127. clients[i] = shareClients.get(i);


  128. } else {

  129. clients[i] = initClient(url);

  130. }

  131. }


  132. return clients;

  133. }


  134. private ExchangeClient initClient(URL url) {


  135. // client type setting.

  136. String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));


  137. url = url.addParameter(CODEC_KEY, DubboCodec.NAME);

  138. // enable heartbeat by default

  139. url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));


  140. // BIO is not allowed since it has severe performance issue.

  141. if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {

  142. throw new RpcException("Unsupported client type: " + str + "," +

  143. " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));

  144. }


  145. ExchangeClient client;

  146. try {

  147. // connection should be lazy

  148. if (url.getParameter(LAZY_CONNECT_KEY, false)) {

  149. client = new LazyConnectExchangeClient(url, requestHandler);


  150. } else {

  151. // 创建client也需要指定requestHandler

  152. // 此处的requestHandler 处理回调callback请求处理【provider回调consumer】

  153. client = Exchangers.connect(url, requestHandler);

  154. }


  155. } catch (RemotingException e) {

  156. throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);

  157. }


  158. return client;

  159. }

  160. 。。。。。

  161. }


多链接、短链接,同步传输,HTTP | TCP

  • HttpProtocol、HessionProtocol、RestProtocol、RmiProtocol、WebServiceProtocol、XmlRpcProtocol【代理协议子类】

  • 每个协议类:创建server并启动,创建client,指定请求处理器handler

  • server 与 client都采用开源组件创建

  • 各代理子类创建server或client对象 都依赖到接口实现类【或接口代理实现类】

  • Protocol接口中export与refer方法需要传入或返回invoker对象

  • AbstractProxyProtocol  中export 与 refer 方法有实现 接口代理实现类对象 与 invoker互转

  • 采用通用的序列化方式【json 或 hessian】

  • RmiProtocol 采用TCP协议,其他协议都采用HTTP协议

  • dubbo 默认采用jetty作为http web服务器

  • 无回调callback处理

  • 源码示例


  
    
    
  
  1. public class HttpProtocol extends AbstractProxyProtocol {

  2. public static final String ACCESS_CONTROL_ALLOW_ORIGIN_HEADER = "Access-Control-Allow-Origin";

  3. public static final String ACCESS_CONTROL_ALLOW_METHODS_HEADER = "Access-Control-Allow-Methods";

  4. public static final String ACCESS_CONTROL_ALLOW_HEADERS_HEADER = "Access-Control-Allow-Headers";


  5. private final Map<String, JsonRpcServer> skeletonMap = new ConcurrentHashMap<>();


  6. private HttpBinder httpBinder;


  7. public HttpProtocol() {

  8. super(HttpException.class, JsonRpcClientException.class);

  9. }


  10. public void setHttpBinder(HttpBinder httpBinder) {

  11. this.httpBinder = httpBinder;

  12. }


  13. @Override

  14. public int getDefaultPort() {

  15. return 80;

  16. }


  17. // 消息处理器

  18. private class InternalHandler implements HttpHandler {


  19. private boolean cors;


  20. public InternalHandler(boolean cors) {

  21. this.cors = cors;

  22. }


  23. @Override

  24. public void handle(HttpServletRequest request, HttpServletResponse response)

  25. throws ServletException {

  26. String uri = request.getRequestURI();

  27. JsonRpcServer skeleton = skeletonMap.get(uri);

  28. if (cors) {

  29. response.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*");

  30. response.setHeader(ACCESS_CONTROL_ALLOW_METHODS_HEADER, "POST");

  31. response.setHeader(ACCESS_CONTROL_ALLOW_HEADERS_HEADER, "*");

  32. }

  33. if (request.getMethod().equalsIgnoreCase("OPTIONS")) {

  34. response.setStatus(200);

  35. } else if (request.getMethod().equalsIgnoreCase("POST")) {


  36. RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());

  37. try {

  38. // skeleton:com.googlecode.jsonrpc4j.JsonRpcServer

  39. // 采用JSON序列化

  40. skeleton.handle(request.getInputStream(), response.getOutputStream());

  41. } catch (Throwable e) {

  42. throw new ServletException(e);

  43. }

  44. } else {

  45. response.setStatus(500);

  46. }

  47. }


  48. }


  49. @Override

  50. protected <T> Runnable doExport(final T impl, Class<T> type, URL url) throws RpcException {

  51. String addr = getAddr(url);

  52. ProtocolServer protocolServer = serverMap.get(addr);

  53. if (protocolServer == null) {

  54. // 指定消息处理器handler

  55. RemotingServer remotingServer = httpBinder.bind(url, new InternalHandler(url.getParameter("cors", false)));

  56. serverMap.put(addr, new ProxyProtocolServer(remotingServer));

  57. }

  58. final String path = url.getAbsolutePath();

  59. final String genericPath = path + "/" + GENERIC_KEY;

  60. // 创建 server

  61. JsonRpcServer skeleton = new JsonRpcServer(impl, type);

  62. // 创建com.googlecode.jsonrpc4j.JsonRpcServer 需要传入rpc接口实现【或代理类】对象

  63. JsonRpcServer genericServer = new JsonRpcServer(impl, GenericService.class);

  64. skeletonMap.put(path, skeleton);

  65. skeletonMap.put(genericPath, genericServer);

  66. return () -> {

  67. skeletonMap.remove(path);

  68. skeletonMap.remove(genericPath);

  69. };

  70. }


  71. @SuppressWarnings("unchecked")

  72. @Override

  73. protected <T> T doRefer(final Class<T> serviceType, URL url) throws RpcException {

  74. final String generic = url.getParameter(GENERIC_KEY);

  75. final boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);

  76. /********* com.googlecode.jsonrpc4j.spring.JsonProxyFactoryBean 创建代理对象过程【rpc-client】 **************/

  77. JsonProxyFactoryBean jsonProxyFactoryBean = new JsonProxyFactoryBean();

  78. JsonRpcProxyFactoryBean jsonRpcProxyFactoryBean = new JsonRpcProxyFactoryBean(jsonProxyFactoryBean);

  79. jsonRpcProxyFactoryBean.setRemoteInvocationFactory((methodInvocation) -> {

  80. RemoteInvocation invocation = new JsonRemoteInvocation(methodInvocation);

  81. if (isGeneric) {

  82. invocation.addAttribute(GENERIC_KEY, generic);

  83. }

  84. return invocation;

  85. });

  86. String key = url.setProtocol("http").toIdentityString();

  87. if (isGeneric) {

  88. key = key + "/" + GENERIC_KEY;

  89. }


  90. jsonRpcProxyFactoryBean.setServiceUrl(key);

  91. // 指定服务接口

  92. jsonRpcProxyFactoryBean.setServiceInterface(serviceType);


  93. jsonProxyFactoryBean.afterPropertiesSet();

  94. // 创建客户端代理类,该代理类实现rpc接口,并返回

  95. // Protocol#refer 方法返回值类型为Invoker

  96. // AbstractProxyProtocol 会将proxy 转成 Invoker 对象

  97. // 【DubboProtocol 服务引用方法没有创建接口实现代理类】

  98. return (T) jsonProxyFactoryBean.getObject();

  99. /********* com.googlecode.jsonrpc4j.spring.JsonProxyFactoryBean 创建代理对象过程【rpc-client】 **************/

  100. }

  101. 。。。

  102. }

  •  HessianProtocol 源码


  
    
    
  
  1. public class HessianProtocol extends AbstractProxyProtocol {


  2. private final Map<String, HessianSkeleton> skeletonMap = new ConcurrentHashMap<String, HessianSkeleton>();


  3. private HttpBinder httpBinder;


  4. public HessianProtocol() {

  5. super(HessianException.class);

  6. }


  7. public void setHttpBinder(HttpBinder httpBinder) {

  8. this.httpBinder = httpBinder;

  9. }


  10. @Override

  11. public int getDefaultPort() {

  12. return 80;

  13. }


  14. // 消息处理handler

  15. private class HessianHandler implements HttpHandler {


  16. @Override

  17. public void handle(HttpServletRequest request, HttpServletResponse response)

  18. throws IOException, ServletException {

  19. String uri = request.getRequestURI();

  20. // com.caucho.hessian.server.HessianSkeleton

  21. HessianSkeleton skeleton = skeletonMap.get(uri);

  22. if (!"POST".equalsIgnoreCase(request.getMethod())) {

  23. response.setStatus(500);

  24. } else {

  25. RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());


  26. Enumeration<String> enumeration = request.getHeaderNames();

  27. while (enumeration.hasMoreElements()) {

  28. String key = enumeration.nextElement();

  29. if (key.startsWith(DEFAULT_EXCHANGER)) {

  30. RpcContext.getContext().setAttachment(key.substring(DEFAULT_EXCHANGER.length()),

  31. request.getHeader(key));

  32. }

  33. }


  34. try {

  35. skeleton.invoke(request.getInputStream(), response.getOutputStream());

  36. } catch (Throwable e) {

  37. throw new ServletException(e);

  38. }

  39. }

  40. }


  41. }


  42. @Override

  43. protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException {

  44. String addr = getAddr(url);

  45. ProtocolServer protocolServer = serverMap.get(addr);

  46. if (protocolServer == null) {

  47. // 指定消息处理handler

  48. // 创建server

  49. RemotingServer remotingServer = httpBinder.bind(url, new HessianHandler());

  50. serverMap.put(addr, new ProxyProtocolServer(remotingServer));

  51. }

  52. final String path = url.getAbsolutePath();

  53. // hessian框架服务提供方核型类:com.caucho.hessian.server.HessianSkeleton

  54. // 创建 com.caucho.hessian.server.HessianSkeleton 对需要传入rpc接口实现【或代理类】对象

  55. final HessianSkeleton skeleton = new HessianSkeleton(impl, type);

  56. skeletonMap.put(path, skeleton);


  57. final String genericPath = path + "/" + GENERIC_KEY;

  58. skeletonMap.put(genericPath, new HessianSkeleton(impl, GenericService.class));


  59. return new Runnable() {

  60. @Override

  61. public void run() {

  62. skeletonMap.remove(path);

  63. skeletonMap.remove(genericPath);

  64. }

  65. };

  66. }


  67. @Override

  68. @SuppressWarnings("unchecked")

  69. protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {

  70. String generic = url.getParameter(GENERIC_KEY);

  71. boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);

  72. if (isGeneric) {

  73. RpcContext.getContext().setAttachment(GENERIC_KEY, generic);

  74. url = url.setPath(url.getPath() + "/" + GENERIC_KEY);

  75. }

  76. /********* com.caucho.hessian.client.HessianProxyFactory 创建代理对象过程【rpc-client】 **************/

  77. HessianProxyFactory hessianProxyFactory = new HessianProxyFactory();

  78. boolean isHessian2Request = url.getParameter(HESSIAN2_REQUEST_KEY, DEFAULT_HESSIAN2_REQUEST);

  79. hessianProxyFactory.setHessian2Request(isHessian2Request);

  80. boolean isOverloadEnabled = url.getParameter(HESSIAN_OVERLOAD_METHOD_KEY, DEFAULT_HESSIAN_OVERLOAD_METHOD);

  81. hessianProxyFactory.setOverloadEnabled(isOverloadEnabled);

  82. String client = url.getParameter(CLIENT_KEY, DEFAULT_HTTP_CLIENT);

  83. if ("httpclient".equals(client)) {

  84. HessianConnectionFactory factory = new HttpClientConnectionFactory();

  85. factory.setHessianProxyFactory(hessianProxyFactory);

  86. hessianProxyFactory.setConnectionFactory(factory);

  87. } else if (client != null && client.length() > 0 && !DEFAULT_HTTP_CLIENT.equals(client)) {

  88. throw new IllegalStateException("Unsupported http protocol client=\"" + client + "\"!");

  89. } else {

  90. HessianConnectionFactory factory = new DubboHessianURLConnectionFactory();

  91. factory.setHessianProxyFactory(hessianProxyFactory);

  92. hessianProxyFactory.setConnectionFactory(factory);

  93. }

  94. int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);

  95. hessianProxyFactory.setConnectTimeout(timeout);

  96. hessianProxyFactory.setReadTimeout(timeout);

  97. // 创建客户端代理类,该代理类实现rpc接口,并返回

  98. // Protocol#refer 方法返回值类型为Invoker

  99. // AbstractProxyProtocol 会将proxy 转成 Invoker 对象

  100. // 【DubboProtocol 服务引用方法没有创建接口实现代理类】

  101. return (T) hessianProxyFactory.create(serviceType, url.setProtocol("http").toJavaURL(), Thread.currentThread().getContextClassLoader());

  102. /********* com.caucho.hessian.client.HessianProxyFactory 创建代理对象过程【rpc-client】 **************/

  103. }


  104. 。。。

  105. }

只有服务引用,没有服务暴露

  • RedisProtocol、MemcacheProtocol【非代理协议】

  • 都有第三方【缓存】服务器

  • consumer只能调用对应get(k)、set(k,v)、delete(k) 三个方法, 与redis| memcache 交互

  • 该协议就是对缓存基础方法的封装,感觉实用价值不大

  • RedisProtocol 源码【MemcacheProtocol代码类似】


  
    
    
  
  1. public class RedisProtocol extends AbstractProtocol {


  2. public static final int DEFAULT_PORT = 6379;

  3. 。。。


  4. @Override

  5. protected <T> Invoker<T> protocolBindingRefer(final Class<T> type, final URL url) throws RpcException {

  6. try {

  7. GenericObjectPoolConfig config = new GenericObjectPoolConfig();

  8. config.setTestOnBorrow(url.getParameter("test.on.borrow", true));

  9. config.setTestOnReturn(url.getParameter("test.on.return", false));

  10. config.setTestWhileIdle(url.getParameter("test.while.idle", false));

  11. if (url.getParameter("max.idle", 0) > 0) {

  12. config.setMaxIdle(url.getParameter("max.idle", 0));

  13. }

  14. if (url.getParameter("min.idle", 0) > 0) {

  15. config.setMinIdle(url.getParameter("min.idle", 0));

  16. }

  17. if (url.getParameter("max.active", 0) > 0) {

  18. config.setMaxTotal(url.getParameter("max.active", 0));

  19. }

  20. if (url.getParameter("max.total", 0) > 0) {

  21. config.setMaxTotal(url.getParameter("max.total", 0));

  22. }

  23. if (url.getParameter("max.wait", 0) > 0) {

  24. config.setMaxWaitMillis(url.getParameter("max.wait", 0));

  25. }

  26. if (url.getParameter("num.tests.per.eviction.run", 0) > 0) {

  27. config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0));

  28. }

  29. if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) {

  30. config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));

  31. }

  32. if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) {

  33. config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));

  34. }

  35. final JedisPool jedisPool = new JedisPool(config, url.getHost(), url.getPort(DEFAULT_PORT),

  36. url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT),

  37. StringUtils.isBlank(url.getPassword()) ? null : url.getPassword(),

  38. url.getParameter("db.index", 0));

  39. final int expiry = url.getParameter("expiry", 0);

  40. // 可指定类似get的方法名

  41. final String get = url.getParameter("get", "get");

  42. // 可指定类似set的方法名

  43. final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set");

  44. // 可指定类似delete的方法名

  45. final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete");

  46. return new AbstractInvoker<T>(type, url) {

  47. @Override

  48. protected Result doInvoke(Invocation invocation) throws Throwable {

  49. Jedis jedis = null;

  50. try {

  51. jedis = jedisPool.getResource();


  52. if (get.equals(invocation.getMethodName())) {

  53. // 不是get(k) 则报错

  54. if (invocation.getArguments().length != 1) {

  55. throw new IllegalArgumentException("The redis get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);

  56. }

  57. byte[] value = jedis.get(String.valueOf(invocation.getArguments()[0]).getBytes());

  58. if (value == null) {

  59. return AsyncRpcResult.newDefaultAsyncResult(invocation);

  60. }

  61. ObjectInput oin = getSerialization(url).deserialize(url, new ByteArrayInputStream(value));

  62. return AsyncRpcResult.newDefaultAsyncResult(oin.readObject(), invocation);

  63. } else if (set.equals(invocation.getMethodName())) {

  64. // 不是set(k, v) 则报错

  65. if (invocation.getArguments().length != 2) {

  66. throw new IllegalArgumentException("The redis set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);

  67. }

  68. byte[] key = String.valueOf(invocation.getArguments()[0]).getBytes();

  69. ByteArrayOutputStream output = new ByteArrayOutputStream();

  70. ObjectOutput value = getSerialization(url).serialize(url, output);

  71. value.writeObject(invocation.getArguments()[1]);

  72. jedis.set(key, output.toByteArray());

  73. if (expiry > 1000) {

  74. jedis.expire(key, expiry / 1000);

  75. }

  76. return AsyncRpcResult.newDefaultAsyncResult(invocation);

  77. } else if (delete.equals(invocation.getMethodName())) {

  78. // 不是delete(k) 则报错

  79. if (invocation.getArguments().length != 1) {

  80. throw new IllegalArgumentException("The redis delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);

  81. }

  82. jedis.del(String.valueOf(invocation.getArguments()[0]).getBytes());

  83. return AsyncRpcResult.newDefaultAsyncResult(invocation);

  84. } else {

  85. // 其余方法一律不支持

  86. throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in redis service.");

  87. }

  88. } catch (Throwable t) {

  89. RpcException re = new RpcException("Failed to invoke redis service method. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url + ", cause: " + t.getMessage(), t);

  90. if (t instanceof TimeoutException || t instanceof SocketTimeoutException) {

  91. re.setCode(RpcException.TIMEOUT_EXCEPTION);

  92. } else if (t instanceof JedisConnectionException || t instanceof IOException) {

  93. re.setCode(RpcException.NETWORK_EXCEPTION);

  94. } else if (t instanceof JedisDataException) {

  95. re.setCode(RpcException.SERIALIZATION_EXCEPTION);

  96. }

  97. throw re;

  98. } finally {

  99. if (jedis != null) {

  100. try {

  101. jedis.close();

  102. } catch (Throwable t) {

  103. logger.warn("returnResource error: " + t.getMessage(), t);

  104. }

  105. }

  106. }

  107. }


  108. @Override

  109. public void destroy() {

  110. super.destroy();

  111. try {

  112. jedisPool.destroy();

  113. } catch (Throwable e) {

  114. logger.warn(e.getMessage(), e);

  115. }

  116. }

  117. };

  118. } catch (Throwable t) {

  119. throw new RpcException("Failed to refer redis service. interface: " + type.getName() + ", url: " + url + ", cause: " + t.getMessage(), t);

  120. }

  121. }

  122. }

扩展第三方RPC服务协议

  • GrpcProtocol,ThriftProtocol【代理协议子类】

  • 【暂时没研究,贴一些概念】

  • gRPC 是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计, ThriftProtocol

  • Thrift是一种接口描述语言和二进制通讯协议,它被用来定义和创建跨语言的服务。它被当作一个远程过程调用(RPC)框架来使用,是由Facebook为“大规模跨语言服务开发”而开发的。

非RPC层协议

  • QosProtocolWrapper:服务健康检查、质量探测 会用到该协议

  • RegistryProtocol:注册阶段使用,在服务注册文章中重点分析

  • ProtocolFilterWrapper:filter协议包装类, 可参考dubbo filter加载,extension Wrapper扩展