vlambda博客
学习文章列表

如何实现netty RPC的服务注册与服务发现?

一、为什么要进行服务注册与发现?

之前的demo只是实现了一个服务消费方对一个服务提供方点对点的服务调用,显然没有做到服务的高可用。那么该如何改进呢?

二、如何实现服务注册与发现——要有注册中心

【注册中心的方案选择】
由于这个“注册中心”工作在多个节点(包括服务提供方和消费方)之间,所以应该具有能够在多个节点之间进行数据共享的能力(能够共享服务注册表)——既可以选择倾向于一致性的zk, 也可以选择倾向于可用性的redis, 当然也有别的方案。
另外,要能够满足服务消费方可以监听服务注册表的变化。zk天然支持监听机制,而redis有发布和订阅的消息机制。所以两者都可以满足需求。

而我在此次demo中选用的是zookeeper(简称zk).

三、如何进行服务注册与发现——服务就绪后进行服务注册

1.通过在zk的指定路径下写入创建临时子节点来实现服务的注册

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
* @Title:服务提供方的服务注册
* @Author:wangchenggong
* @Date 2021/4/5 11:25
* @Description
* @Version
*/

@Component
public class ServiceRegistry implements InitializingBean {

private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class);
/**
* Zk客户端
*/

private CuratorFramework client = null;
@Value("${rpc.server.address}")
private String providerAddress;
@Value("${rpc.registry.address}")
private String registryHost;
@Value("${rpc.registry.path}")
private String registryPath;

@Override
public void afterPropertiesSet() throws Exception {
connectRegistryServer();
}

/**
* 与注册中心建立连接
*/

private void connectRegistryServer() {
if(client==null) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient(registryHost, retryPolicy);
client.start();
logger.info("service provider connected registry center! >>> {}", registryHost);
}
}

/**
* 将自己注册到注册中心,在服务提供方就绪后调用
*/

public void doRegisterSelf() throws Exception{
String serviceNodePath = this.registryPath+"/"+providerAddress;
Stat stat = client.checkExists().forPath(serviceNodePath);
if(stat != null){
client.delete().deletingChildrenIfNeeded().forPath(serviceNodePath);
}
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(serviceNodePath);
client.setData().forPath(serviceNodePath,serviceNodePath.getBytes("UTF-8"));
}


}

2.通过实现ApplicationListener接口监听SpringBoot的就绪事件ApplicationReadyEvent

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
* @Title:服务方启动就绪监听器
* @Author:wangchenggong
* @Date 2021/3/31 16:14
* @Description
* @Version
*/

@Component
public class ProviderReadyListener implements ApplicationListener<ApplicationReadyEvent> {


private final Logger logger = LoggerFactory.getLogger(ProviderReadyListener.class);
@Autowired
private ServiceRegistry serviceRegistry;

@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {

try {
serviceRegistry.doRegisterSelf();
logger.info("》》》》》服务提供方已经就绪!");
} catch (Exception e) {
logger.error("服务注册发生异常",e);
}

}
}

四、如何进行服务注册与发现——消费方拉取并监听注册表

1.通过对zk的指定路径获取子节点列表来实现服务发现功能,并监听子节点变化动态更新连接列表


import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* @Title:服务发现
* @Author:wangchenggong
* @Date 2021/4/6 6:17
* @Description
* @Version
*/

@Component
public class ServiceDiscovery {
private static final Logger logger = LoggerFactory.getLogger(ServiceDiscovery.class);
private volatile Set<String> serviceAddressSet = new HashSet<>();
/**
* Zk客户端
*/

private CuratorFramework client = null;
private TreeCache treeCache;
@Value("${rpc.registry.address}")
private String registryHost;
@Value("${rpc.registry.path}")
private String registryPath;
@Autowired
private ConnectionManager connectionManager;
/**
* 初始化
*/

@PostConstruct
public void init() throws Exception {
//与注册中心建立连接
if(client==null) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient(registryHost, retryPolicy);
client.start();
logger.info("consumer connected registry center! >>> {}", registryHost);
}
//监听子节点的变化
doSubscribe();

//获取子节点的数据,添加到list中,并更新连接管理器中的通道列表
getAndUpdateServiceConnection();
}

/**
* 订阅服务子节点的变化
*/

private void doSubscribe(){
try{
if(client.checkExists().forPath(registryPath)==null) {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(registryPath);
}
this.treeCache = new TreeCache(client, registryPath);
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) throws Exception {
TreeCacheEvent.Type type = event.getType();
if(type == TreeCacheEvent.Type.NODE_ADDED || type == TreeCacheEvent.Type.NODE_UPDATED
||type == TreeCacheEvent.Type.NODE_REMOVED){
getAndUpdateServiceConnection();
}
}
});
treeCache.start();
logger.info("The consumer do subscribe success!-----------------");
}catch(Exception e){
logger.error("The consumer do subscribe exception", e);
}

}

/**
* 获取服务节点列表,并更新连接管理器中的连接列表
* @throws Exception
*/

private void getAndUpdateServiceConnection() throws Exception {
List<String> serviceNodes = client.getChildren().forPath(registryPath);
serviceAddressSet.addAll(serviceNodes);

connectionManager.updateConnectServer(serviceAddressSet);
}
}

2.通过连接管理器来支持服务节点的变更和动态选择路由


import com.google.common.net.HostAndPort;
import com.rpc.client.NettyClient;
import io.netty.channel.Channel;
import io.netty.util.internal.ThreadLocalRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* @Title:连接管理器
* @Author:wangchenggong
* @Date 2021/4/5 12:42
* @Description
* @Version
*/

@Component
public class ConnectionManager {
private static final Logger logger = LoggerFactory.getLogger(ConnectionManager.class);
@Autowired
private NettyClient nettyClient;

private Map<SocketAddress, Channel> channelMap = new ConcurrentHashMap<>();

public Channel chooseChannel(){
int size = channelMap.size();
if(size>0){
Collection<Channel> values = channelMap.values();
//随机获取一个服务节点
int index = ThreadLocalRandom.current().nextInt(size);
return values.toArray(new Channel[0])[index];
}
return null;
}

public synchronized void updateConnectServer(Set<String> addressList) throws InterruptedException {
if(CollectionUtils.isEmpty(addressList)){
closeAllServerChannels();
throw new RuntimeException("没有可用的服务节点");
}

//转换为Set<SocketAddress>
Set<SocketAddress> newSocketAddressList = addressList.stream().map(address -> {
HostAndPort hostAndPort = HostAndPort.fromString(address);
return new InetSocketAddress(hostAndPort.getHostText(), hostAndPort.getPort());
}).collect(Collectors.toSet());


for(SocketAddress newSocket:newSocketAddressList){
Channel newChannel = channelMap.get(newSocket);
if(newChannel == null || !newChannel.isOpen()){
Channel channel = nettyClient.doConnect(newSocket);
channelMap.put(newSocket, channel);
}
}

//取差集,关闭已经过时的通道
Set<SocketAddress> socketAddresses = channelMap.keySet();
Set<SocketAddress> oldSocketAddresses = new HashSet<>(socketAddresses);
oldSocketAddresses.removeAll(newSocketAddressList);

for (SocketAddress oldSocket:oldSocketAddresses) {
Channel oldChannel = channelMap.get(oldSocket);
if(oldChannel != null){
oldChannel.close();
}
}
}

private void closeAllServerChannels() {
Collection<Channel> values = channelMap.values();
for (Channel value:values) {
value.close();
}
channelMap.clear();
}


public void removeChannel(Channel channel){
logger.info("从连接管理器中移除失效Channel.{}",channel.remoteAddress());
channelMap.remove(channel.remoteAddress());
}
}

完整代码请参见:

https://github.com/success1988/netty_study/tree/main/netty_rpc_common