vlambda博客
学习文章列表

如虎添翼,为gRPC添加服务注册发现能力!

有了前面的铺垫,我们已经对gRPC的java实现机制,代码编写手法、阻塞RPC以及双向流等内容有了全面、直观地了解。

本文我们继续本系列,为我们的gRPC添加服务注册发现。

什么是服务注册发现?

这种硬编码配置方式应对变化的能力很差,如果服务提供方宕机,服务消费者无法及时更换调用的目标,即便服务提供方存在冗余的机器,消费者也需要修改配置文件,重启服务才能调用至新的服务提供方节点。

通俗地说就是,这种方式将服务提供方与服务消费方耦合在了一起,不够灵活。

因此就需要有服务注册发现机制。如下图所示:

registry.png

这里引用了dubbo框架的简易架构图。图中,服务提供方(provider)启动后会向注册中心(Registry)发起服务注册,将自己的ip、端口、其他元数据信息发送给注册中心。

注册中心维护了一个注册表,对上报的服务注册信息进行记录。

服务消费者(consumer)启动后会向注册中心(Registry)拉取服务提供方列表,也就是图中的 「subscribe」 ,即:服务发现过程。

注意看,「3.notify」 是一条虚线,这里的含义是指,一旦服务提供方的注册信息发生变更,如现有节点下线(有可能是正常的关机,如版本发布;也有可能是意外宕机,都会导致服务下线。)或者新节点上线,都会造成注册中心中记录的服务注册信息发生变更,此时注册中心会通知服务消费者存在注册表信息变更,此时需要对最新的服务注册信息进行变更,一般有几种方式:

  1. 注册中心通过push方式主动推送给消费者,这种方式往往通过消费者向注册中心注册监听器方式实现;
  2. 消费者定时通过pull方式从注册中心拉取注册表信息并在本地进行更新;
  3. 消费者通过长轮询方式从注册中心拉取注册表信息(推拉结合)。

有了服务注册发现机制之后,如何进行RPC调用?

那么,有了服务注册发现机制之后的RPC调用过程是怎样的?

如虎添翼,为gRPC添加服务注册发现能力!

如上图,实际上有了服务注册发现机制之后,服务消费者就不需要事先硬编码服务提供方的机器列表。

而是在运行时选择一台机器进行调用,这就是所谓的负载均衡策略,一般来说负载均衡有随机、轮询、加权随机、一致性哈希等方式。

图中使用的为轮询策略,则先选择192.168.21.1,下次选择192.168.21.2,然后重复这个过程。

如果服务提供方中某台机器下线,如192.168.21.1下线,则服务A的消费者能够感知到这个过程,拉取到全新的注册表信息后,下次调用就不会再去调用已下线的机器。

如虎添翼,为gRPC添加服务注册发现能力!

图中,提供者1离线,消费者拉取到最新的注册表信息,选择了健康状态的提供者2,发起RPC调用。

毫不夸张地说,服务注册发现机制为RPC调用提供了运行时自愈的能力。

实战:为gRPC添加服务注册发现

有了前面的铺垫,那么我们就通过实战案例来讲解,如何为gRPC添加服务注册发现。

本文使用Nacos作为服务注册中心选型。

事实上,除了Nacos外,Zookeeper、Etcd、Redis等都可以作为服务注册中心。

之所以选择了Nacos,一方面是因为它足够成熟,Nacos起源于淘宝五彩石项目,支撑了双十一的海量流量;

成长于阿里云的ACM,开源后作为SpringCloudAlibaba默认的服务注册与配置中心,且它与CloudNative深度整合,是面向未来的一款工业级产品。

Nacos具备高可用能力,且拥有不错的可视化能力,因此我们选择它作为服务注册中心的选型。

实战:为服务提供方添加服务注册

首先为服务提供方添加服务注册能力。

定义服务注册配置类NacosRegistryConfig.java,封装服务提供方的ip、端口、服务名等信息。

public class NacosRegistryConfig {

   private static final Logger logger = Logger.getLogger(NacosRegistryConfig.class.getName());

   /**Nacos服务单地址*/
   private String serverAddr;
   /**注册端口,一般就是服务暴露端口*/
   private int port;
   /**权重*/
   private double weight = 1.0;
   /**服务名*/
   private String serviceName;
   /**当前服务ip*/
   private String ip;

通过有参构造方法,在配置类创建期间获取到当前服务所处网络ip。

public NacosRegistryConfig(String serverAddr, int port, double weight, String serviceName) {
   this.serverAddr = serverAddr;
   this.port = port;
   this.weight = weight;
   this.serviceName = serviceName;

   try {
      InetAddress inetAddress = InetAddress.getLocalHost();
      this.ip = inetAddress.getHostAddress();
   } catch (UnknownHostException e) {
      throw new RuntimeException("NacosRegistryConfig.getLocalHost failed.", e);
   }

   logger.info("NacosRegistryConfig construct done. serverAddr=[" + serverAddr +
         "],serviceName=" + serviceName +
         "],ip=[" + ip +
         "],port=[" + port +
         "],weight=[" + weight + "]");
}

编写服务注册核心方法,用于在服务提供方启动时向Nacos注册服务信息。注意:「服务注册一定要在服务提供方服务发布之前!」

    public void register() {
      try {

         NamingService namingService = NamingFactory.createNamingService(serverAddr);
         // 创建一个服务实例
         Instance instance = new Instance();
         instance.setIp(ip);
         instance.setPort(port);
         instance.setHealthy(false);
         instance.setWeight(weight);
         instance.setInstanceId(serviceName + "-instance");

         // 自定义服务元数据
         Map<String, String> instanceMeta = new HashMap<>();
         instanceMeta.put("language""java");
         instanceMeta.put("rpc-framework""gRPC");
         instance.setMetadata(instanceMeta);
         
         // 声明一个集群
         Cluster cluster = new Cluster();
         cluster.setName("DEFAULT-CLUSTER");
         
         // 为集群添加元数据
         Map<String, String> clusterMeta = new HashMap<>();
         clusterMeta.put("name", cluster.getName());
         cluster.setMetadata(clusterMeta);
  
  // 为实例添加集群名称
         instance.setClusterName("DEFAULT-CLUSTER");
         
         // 注册服务实例
         namingService.registerInstance(serviceName, instance);
  
         namingService.subscribe(serviceName, new EventListener() {
            @Override
            public void onEvent(Event event) {
               System.out.println(((NamingEvent)event).getServiceName());
               System.out.println(((NamingEvent)event).getInstances());
            }
         });
      } catch (NacosException e) {
         throw new RuntimeException("Register Services To Nacos Failed.", e);
      }
   }
}

这个方法通过声明并创建NamingService,并为其添加了实例信息,设置了ip、端口、元信息、集群名称等属性,并通过「namingService.registerInstance」 完成服务的注册操作。

重点看一下registerInstance方法实现。

方法签名:

/**
 * register a instance to service with specified instance properties
 *
 * @param serviceName name of service
 * @param instance    instance to register
 * @throws NacosException
 */
void registerInstance(String serviceName, Instance instance) throws NacosException;

接口方法实现:

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

    if (instance.isEphemeral()) {
        BeatInfo beatInfo = new BeatInfo();
        beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
        beatInfo.setIp(instance.getIp());
        beatInfo.setPort(instance.getPort());
        beatInfo.setCluster(instance.getClusterName());
        beatInfo.setWeight(instance.getWeight());
        beatInfo.setMetadata(instance.getMetadata());
        beatInfo.setScheduled(false);
        long instanceInterval = instance.getInstanceHeartBeatInterval();
        beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);

        beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
    }

    serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}

可以看到,Nacos实际上是将服务注册信息封装到了BeatInfo对象中,在启动阶段,服务提供方主动上报服务注册信息。

在运行时,服务提供方通过心跳与Nacos服务端进行通信,并通过心跳来传递服务注册信息。一旦客户端发生变更,服务端可以在下次心跳感知到该变更,并做相应地修改。

这个设计是比较巧妙的。

添加服务注册逻辑

接着我们需要在服务提供者启动逻辑中,添加服务注册逻辑。

public class OrderServerBoot {

   private static final Logger logger = Logger.getLogger(OrderServerBoot.class.getName());

   private Server server;

   // Nacos服务端地址
   private static final String NACOS_SERVER_ADDR = "nacos-server:8848";

   @SneakyThrows
   private void startServer() {
      int serverPort = 10881;
      server = ServerBuilder.forPort(serverPort)
            .addService(new OrderServiceImpl())
            .addService(new DoubleStreamServiceImpl())
            .build();
            
      // 服务注册
      NacosRegistryConfig nacosRegistryConfig = new NacosRegistryConfig(NACOS_SERVER_ADDR, serverPort, 1.0, "grpc-server-demo");
      nacosRegistryConfig.register();       
            
      server.start();

      logger.info("OrderServerBoot started, listening on:" + serverPort);

      // 优雅停机
      addGracefulShowdownHook();
   }

重点看如下代码:

  // 服务注册
  NacosRegistryConfig nacosRegistryConfig = new NacosRegistryConfig(NACOS_SERVER_ADDR, serverPort, 1.0, "grpc-server-demo");
  nacosRegistryConfig.register();       

在gRPC服务提供方的startServer方法中,通过该方法调用,向Nacos注册了服务提供者的信息(serverName为「grpc-server-demo」,该名称将唯一标识一个服务),然后再执行服务端启动逻辑,发布服务。

运行服务提供方的启动方法,观察Nacos管理平台的服务注册信息如下:

如虎添翼,为gRPC添加服务注册发现能力!



从管理平台能够看到我们的服务已经成功注册到了Nacos中,并且成功上报了元信息。

实战:为服务消费者提供服务发现

继续给服务消费者添加服务发现能力。

public class NacosRegistryConfig {

   private static final Logger logger = Logger.getLogger(NacosRegistryConfig.class.getName());

   /**Nacos服务单地址*/
   private String serverAddr;
   /**服务名*/
   private String providerServiceName;
   /**提供者ip*/
   private String providerIp;
   /**提供者端口*/
   private int providerPort;

   public NacosRegistryConfig(String serverAddr, String providerServiceName) {
      this.serverAddr = serverAddr;
      this.providerServiceName = providerServiceName;
      // 服务发现
      findServerList();
   }

与服务提供者类似,消费者也有一个NacosRegistryConfig服务发现配置类。

构造方法中,完成服务发现,调用方法  findServerList()

private void findServerList() {
   try {
      // 连接Nacos-server
      NamingService namingService = NamingFactory.createNamingService(serverAddr);
      // 获取服务提供者实例信息
      List<Instance> instances = namingService.getAllInstances(providerServiceName);

      // 随机策略  TODO 改为基于取模粘滞请求,基于userId取模
      int serverSize = instances.size();
      Random random = new Random();
      int index = random.nextInt(serverSize);
      System.out.println("serverSize:" + serverSize + "选择的机器:" + index);
      Instance instance = instances.get(index);
      // 获取ip 端口
      this.providerIp = instance.getIp();
      this.providerPort = instance.getPort();

      // TODO还需要考虑对服务列表变更的处理
      namingService.subscribe(providerServiceName, new EventListener() {
         @Override
         public void onEvent(Event event) {
            System.out.println(((NamingEvent)event).getServiceName());
            System.out.println(((NamingEvent)event).getInstances());
         }
      });
   } catch (NacosException e) {
      throw new RuntimeException("Register Services To Nacos Failed.", e);
   }
}

解释下逻辑:

  • 首先建立到Nacos服务端的链接
  • 构造NamingService实例,通过 NamingService.getAllInstances(String serviceName) 方法获取到服务端所有以serviceName命名的实例列表(serviceName为服务端注册的属性)
  • 这里使用随机策略从实例中随机获取一台服务端实例ip及端口(均为提供方注册好的)
  • 添加一个服务变更监听

当逻辑执行完成,便会为NacosRegistryConfig的providerIp、providerPort赋值。

提供get方法以方便外部快速获取到提供者的ip与端口。

public String getProviderIp() {
   return providerIp;
}

public int getProviderPort() {
   return providerPort;
}

发起服务调用

// 端口及ip
int port = nacosRegistryConfig.getProviderPort();
String providerIp = nacosRegistryConfig.getProviderIp();
OrderClientAgent orderClientAgent = new OrderClientAgent(providerIp, port);

接着只需要替换原先的获取服务提供方ip、端口的逻辑,改为从nacosRegistryConfig中获取,其余逻辑保持不变即可。

运行

先后启动服务提供方、服务消费方,查看控制台日志输出。

提供方

三月 21, 2022 10:57:50 上午 OrderServerBoot startServer
信息: OrderServerBoot started, listening on:10881
三月 21, 2022 10:57:50 上午 registry.NacosRegistryConfig <init>
信息: NacosRegistryConfig construct done. serverAddr=[nacos-server:8848],serviceName=grpc-server-demo],ip=[169.254.19.253],port=[10881],weight=[1.0]

消费方

三月 21, 2022 10:58:30 上午 OrderClientBoot doPlaceOrder
信息: client placeOrder end. response:userId: 10086
,resultCode:SUCCESS
三月 21, 2022 10:58:30 上午 agent.OrderClientAgent queryOrders
信息: client queryOrders start. request:userId: 10086

三月 21, 2022 10:58:30 上午 OrderClientBoot doQueryOrder
信息: client queryOrders end. response:userId: 10086
totalPrice: "207.5000"
userOrder {
  orderId: 2095135383
  orderPrice: "12.50"
  orderAmount: "15.00"
  productId: 1
}
userOrder {
  orderId: 2095135383
  orderPrice: "10.00"
  orderAmount: "2.00"
  productId: 2
}
......

可以看到,我们仍旧能够成功发起服务调用。

这里提个问题,假设我们部署多个服务提供方,不修改消费者逻辑,是否仍然能够成功发起RPC调用?

答案是肯定的。由于服务注册发现的存在,消费者能够及时获取到提供方的服务变更信息,在运行期能够根据我们指定的策略,选择健康的提供者实例并发起RPC调用。

如果有linux环境且安装了docker,则可以通过docker方式启动一个Nacos实例。

附录:docker方式安装Nacos-Server

以下操作,笔者是在centos7实现的。

安装docker

yum -y install docker

设置开机启动

systemctl enable docker

启动docker

systemctl start docker

查看docker当前版本

docker version

接着安装Nacos-Server

https://hub.docker.com/r/nacos/nacos-server

docker方式安装并启动Nacos-server(版本1.1.4,具体版本可以自行指定)

docker run --name nacos -e MODE=standalone -p 8848:8848 -d nacos/nacos-server:1.1.4

访问Nacos-server:

http://ip:8848/nacos/
默认用户名:nacos
默认密码:nacos

小结

本文重点介绍了服务注册发现的作用及意义,并实战了如何为gRPC添加了服务注册发现能力。

在后面的文章中,我们将继续探究gRPC的深层原理,并且会对Nacos的服务注册发现、配置管理等内容进行源码级别的学习与研究,敬请期待。