01.dubbo源码解析--注册中心(缓存机制)
[email protected]:apache/dubbo.git
缓存机制
1.源码分析
AbstractRegistry
缓存机制FailbackRegistry
重试机制ZookeeperRegistry、NacosRegistry
具体的注册中心实现,每个注册中心都有一个对应的工厂类,如 ZookeeperRegistryFactory、NacosRegistryFactory。当消费者 URL 订阅的注册信息发生变化时,ZookeeperRegistry 会回调notify(URL url, NotifyListener listener, List<URL> urls)
方法,更新内存和磁盘上本地缓存的注册信息,并通知监听者。
public interface RegistryService {
// 注册服务
// dubbo://10.20.153.10/dubbo.BarService?version=1.0.0&application=kylin
void register(URL url);
void unregister(URL url);
// 订阅指定服务
// consumer://10.20.153.10/dubbo.BarService?version=1.0.0&application=kylin
void subscribe(URL url, NotifyListener listener);
void unsubscribe(URL url, NotifyListener listener);
// 查找指定服务
// consumer://10.20.153.10/dubbo.BarService?version=1.0.0&application=kylin
List<URL> lookup(URL url);
}
2.缓存机制
消费者从注册中心获取注册信息后会做本地缓存,本地缓存保存两份:一是内存中保存一份,通过 notified 的 Map 结构进行保存;二是磁盘上保存一份,通过 file 保持引用。
//本地磁盘缓存,有一个特殊的key值为registies,记录的是注册中心列表,其他记录的都是服务提供者列表
private final Properties properties = new Properties();
private File file; // 磁盘文件服务缓存对象
private final ConcurrentMap<URL, Map<String, List<URL>>> notified =
new ConcurrentHashMap<>(); // 内存中的服务缓存对象
属性来源于AbstractRegistry
notified
是内存中的服务缓存对象,外层 Key 是消费者 URL,内层的 kye 是分类(category),包含 providers、consumers、routers、configurators 四种,value 则是对应的服务列表。file
磁盘缓存对象,当订阅的信息发生变更时先更新 properties 的内容,通过 properties 再写入磁盘。
2.1.缓存的加载
当初始化注册中心时,会通过 AbstractRegistry 的默认构造器加载磁盘缓存文件 file 中的订阅信息。当注册中心无法连接或宕机时使用缓存。
// 初始化加载磁盘缓存文件 file 中的订阅信息
private void loadProperties() {
if (file != null && file.exists()) {
...
InputStream in = new FileInputStream(file);
properties.load(in);
}
}
2.2 缓存的更新
当订阅的注册信息发生变量时,ZookeeperRegistry 会回调 notify 方法更新缓存中的数据,其中第一个参数为消费者 url,第三个参数为注册中心注册的 urls。
/**
* 当订阅的注册信息发生变更时,通知 consumer url 更新注册列表
*
* @param url consumer side url
* @param listener listener
* @param urls provider latest urls
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
...
// 按category进行分类,并根据消费者url过滤订阅的urls
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified =
notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
// 1. 更新内存中的本地缓存:notified
categoryNotified.put(category, categoryList);
// 2. 更新磁盘中的本地缓存:properties -> file
saveProperties(url);
// 3. 通知监听者
listener.notify(categoryList);
}
}
总结:当注册信息发生变量时,主要做了三件事:一是更新内存中的注册信息 notified;二是更新磁盘中的数据 properties;三是通知监听者。
// 参数url为消费者url,将内存中 notified 对应的消费者 url 对应的注册信息缓存到磁盘上。
private void saveProperties(URL url) {
StringBuilder buf = new StringBuilder();
// 1. 将 notified 对应的 url 注册信息保存为字符串,用于持久化
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified != null) {
for (List<URL> us : categoryNotified.values()) {
for (URL u : us) {
if (buf.length() > 0) {
buf.append(URL_SEPARATOR);
}
buf.append(u.toFullString());
}
}
}
// 2. 同步到磁盘 file
properties.setProperty(url.getServiceKey(), buf.toString());
long version = lastCacheChanged.incrementAndGet();
if (syncSaveFile) {
doSaveProperties(version);
} else {
registryCacheExecutor.execute(new SaveProperties(version));
}
}
总结:doSaveProperties 调用 properties.store(outputFile, "Dubbo Registry Cache")
将内存中的注册信息保存到文件中。properties 的 key、value 分别如下:
key
消费者 URL#getServiceKey,即{group/}serviceInterface{:version}
,其中 group 和 version 都是可选。value
消费者 URL 订阅的注册信息 urls,多个 URL 用空格分隔。示例如下:
"binarylei.dubbo.api.EchoService" -> "empty://192.168.139.1/binarylei.dubbo.api.EchoService?application=dubbo-consumer&category=configurators&dubbo=2.6.0&interface=binarylei.dubbo.api.EchoService&methods=echo&pid=21540&side=consumer×tamp=1570799586361
empty://192.168.139.1/binarylei.dubbo.api.EchoService?application=dubbo-consumer&category=routers&dubbo=2.6.0&interface=binarylei.dubbo.api.EchoService&methods=echo&pid=21540&side=consumer×tamp=1570799586361 dubbo://192.168.139.1:20880/binarylei.dubbo.api.EchoService?anyhost=true&application=dubbo-provider&dubbo=2.6.0&generic=false&interface=binarylei.dubbo.api.EchoService&methods=echo&pid=2460&side=provider×tamp=1570798917842
empty://192.168.139.1/binarylei.dubbo.api.EchoService?application=dubbo-consumer&category=providers,configurators,routers&dubbo=2.6.0&interface=binarylei.dubbo.api.EchoService&methods=echo&pid=21540&side=consumer×tamp=1570799586361"
2.2 重试机制
FailbackRegistry 继承自 AbstractRegistry,并在此基础上增加了失败重试的能力。FailbackRegistry 内部定义HashedWheelTimer retryTimer
,会将调用失败需要重试的任务添加到 retryTimer 中。
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered =
new ConcurrentHashMap<URL, FailedRegisteredTask>();
// 取消注册失败的 URL 集合
private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered =
new ConcurrentHashMap<URL, FailedUnregisteredTask>();
// 发起订阅失败的监听器集合
private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed =
new ConcurrentHashMap<Holder, FailedSubscribedTask>();
// 取消订阅失败的监听器集合
private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed =
new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();
// 通知失败的 URL 集合
private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified =
new ConcurrentHashMap<Holder, FailedNotifiedTask>();
总结:FailbackRegistry 对注册、订阅、通知失败的情况都进行了重试处理,对于需要重试的任务都保存在对应的集合中,并通过 retryTimer.newTimeout
定时器定时处理。下面以注册 register 为例分析重试机制。 总结:当注册失败时,Dubbo 会将注册失败的 URL 添加到重试任务中。HashedWheelTimer 本质和 Timer 一样是一个定时器。如果重试成功就会删除 failedRegistered 队列中的任务,失败则调用 reput 继续重试。在 AbstractRetryTask 配置了两个默认参数 retryPeriod=5s 和 retryTimes=3,即 5s 重试一次,最多重试 3 次。
@Override
public void register(URL url) {
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
doRegister(url);
} catch (Exception e) {
...
// 失败重试
addFailedRegistered(url);
}
}
private void addFailedRegistered(URL url) {
FailedRegisteredTask oldOne = failedRegistered.get(url);
if (oldOne != null) {
return;
}
FailedRegisteredTask newTask = new FailedRegisteredTask(url, this);
oldOne = failedRegistered.putIfAbsent(url, newTask);
if (oldOne == null) {
retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
}
}
3.1 ZookeeperRegistry
ZookeeperRegistry 等具体的实现类,主要功能是实现具体的注册、订阅、查找方法 doRegister、doUnregister、doSubscribe、doUnsubscribe、lookup
初始化
ZookeeperRegistry 初始化主要完成两件事:一是 zkClient 客户端初始化;二是注册监听器,一旦注册中心无法连接则将当前注册和订阅的 URL 添加到重试任务中。
// url 是注册中心地址,ZookeeperTransporter 是 ZK 客户端,默认是 curator
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 获取组名,默认为 dubbo
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
// ZK 注册的根据路径是 '/dubbo'
this.root = group;
// 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporter
zkClient = zookeeperTransporter.connect(url);
// 添加状态监听器,当 ZK 无法连接时从内存中保存的注册信息恢复
zkClient.addStateListener(state -> {
if (state == StateListener.RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}
总结:ZookeeperRegistry 初始化时的两件事,一是创建客户端,二是自动恢复。
ZookeeperTransporter 客户端有 curator 和 ZkClient 两种实现。通过 URL 的 client 或 transporter 进行动态适配,默认的实现是 CuratorZookeeperTransporter。
当注册中心无法连接时,将当前注册和订阅的 URL 添加到重试任务中,一旦网络正常则自动恢复。recover 是在 FailbackRegistry 中实现的。
@Override
protected void recover() throws Exception {
// register:将当前注册的 URL 添加到定时器中进行重试
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if (!recoverRegistered.isEmpty()) {
for (URL url : recoverRegistered) {
addFailedRegistered(url);
}
}
// subscribe:将当前订阅的 URL 添加到定时器中进行重试
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
addFailedSubscribed(url, listener);
}
}
}
}
注册
注册直接调用 zkClient 的 create 方法创建节点,delete 方法删除节点,默认为临时节点。consumer 注册主要是为了方便 Admin 使用。
@Override
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " +
getUrl() + ", cause: " + e.getMessage(), e);
}
}
订阅
订阅相对注册复杂很多,分两种情况,一是 url.getServiceInterface() 是 *
,也就是全量获取注册信息,一般是 Admin 使用;二是订阅指定的 serviceInterface。这里主要分析第二种情况。
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
// 1. 获取所有的注册信息,一般 Admin 会获取所有的服务 ANY_VALUE=*
if (ANY_VALUE.equals(url.getServiceInterface())) {
...
// 2. 获取指定的服务 serviceInterface
} else {
List<URL> urls = new ArrayList<>();
// 2.1 '/dubbo/serviceInterface/{providers、routers、configurators}'
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
// 2.2 创建zkListener,当注册信息发生变化时,调用notify(url,listener,urls)
if (zkListener == null) {
listeners.putIfAbsent(listener, (parentPath, currentChilds) ->
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkListener = listeners.get(listener);
}
// 2.3 创建{providers、routers、configurators}目录,永久性节点
zkClient.create(path, false);
// 2.4 注册zkListener,并获取{providers}的子节点信息
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 2.5 通知 listener
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " +
getUrl() + ", cause: " + e.getMessage(), e);
}
}
总结: 消费者 URL 会指定需要订阅的 category{providers、routers、configurators} 类型,依次遍历这几个目录。如果指定的目录不存在,首先会创建一个永久性的目录(2.3),并注册对应的 zkListener(2.4),zkListener 在节点发生变化时调用 notify 通知 listener(2.2)。在注册 zkListener 时会返回对应的子节点注册信息,并通知 listener(2.5)。
也就是说,订阅时首先获取该服务在 ZK 上全量注册信息,之后消费者感知注册信息变化,则是通过 zkListener 事件通知的方式。
NacosRegistry
NacosRegistry 注册中心和 ZookeeperRegistry 类似,也是通过事件通知的方式感知服务注册信息变化。
注册
NacosRegistry 注册时会将 URL 转化为 Nacos 的实例对象 Instance,调用 registerInstance 进行注册,deregisterInstance 取消注册。
public void doRegister(URL url) {
final String serviceName = getServiceName(url);
final Instance instance = createInstance(url);
execute(namingService -> namingService.registerInstance(serviceName, instance));
}
总结: NacosRegistry 注册非常简单,主要分析一下在 Nacos 上注册的数据结构。
serviceName:{category}
:{serviceInterface}:{version}:{group}。其中 version 和 group 可以缺省,eg: providers:org.apache.dubbo.demo.DemoService::instance
:这是 Nacos 的服务实例模型。
Nacos 注册示例如下:
"providers:org.apache.dubbo.demo.DemoService::" -> {
"enabled": true,
"ephemeral": true,
"healthy": true,
"instanceHeartBeatInterval": 5000,
"instanceHeartBeatTimeOut": 15000,
"ip": "192.168.139.1",
"ipDeleteTimeout": 30000,
"metadata": {
"side": "provider",
"methods": "sayHello",
"release": "",
"deprecated": "false",
"dubbo": "2.0.2",
"pid": "1128",
"interface": "org.apache.dubbo.demo.DemoService",
"generic": "false",
"path": "org.apache.dubbo.demo.DemoService",
"protocol": "dubbo",
"application": "dubbo-provider",
"dynamic": "true",
"category": "providers",
"anyhost": "true",
"bean.name": "org.apache.dubbo.demo.DemoService",
"register": "true",
"timestamp": "1570933206811"
},
"port": 20880,
"weight": 1
}
订阅
订阅时,首先获取需要订阅的服务名称,和 ZK 一样,也分为 Admin 和普通 serviceInterface 订阅二种情况。
public void doSubscribe(final URL url, final NotifyListener listener) {
Set<String> serviceNames = getServiceNames(url, listener);
doSubscribe(url, listener, serviceNames);
}
// 订阅指定的 serviceNames
private void doSubscribe(final URL url, final NotifyListener listener,
final Set<String> serviceNames) {
execute(namingService -> {
for (String serviceName : serviceNames) {
List<Instance> instances = namingService.getAllInstances(serviceName);
// 通知 listener,将 Nacos Instance 适配成 Dubbo URL 后通知 listener
notifySubscriber(url, listener, instances);
// 注册监听器,感知服务注册信息变化
subscribeEventListener(serviceName, url, listener);
}
});
}
总结: 和 ZookeeperRegistry 类似,首先获取对应服务名称的服务实例,通过 notifySubscriber 通知 listener。之后服务感知也是通过 subscribeEventListener 事件机制。
private void subscribeEventListener(String serviceName, final URL url,
final NotifyListener listener) throws NacosException {
if (!nacosListeners.containsKey(serviceName)) {
EventListener eventListener = event -> {
if (event instanceof NamingEvent) {
NamingEvent e = (NamingEvent) event;
// 服务注册信息变化时通知 listener
notifySubscriber(url, listener, e.getInstances());
}
};
// 注册EventListener
namingService.subscribe(serviceName, eventListener);
nacosListeners.put(serviceName, eventListener);
}
}
总结
Dubbo 对注册中心进行了统一的抽象,核心接口是 RegistryService
,其子类 AbstractRegistry
实现了缓存机制,FailbackRegistry
实现了重试机制。
ZookeeperRegistry、NacosRegistry
则具体等实现,则是完成具体的服务注册和订阅。注册比较简单,订阅主要是通过事件机制,当注册的服务发生变化时调用 notify(URL url, NotifyListener listener, List<URL> urls)
方法,更新内存和磁盘上本地缓存的注册信息,并通知监听者。
其中 RegistryDirectory 就是其中一个监听者,会感知服务信息的变化,管理某个服务对应的所有注册信息。
服务自省
我们知道 Dubbo 的注册是以服务接口 serviceInterface 为单位进行注册的,而大多数注册中心的设计都是以服务实例为单位进行注册的,如 Nacos、eureka、Spring Cloud 等。以服务实例进行注册更接近云原先,而且以服务接口为单位进行注册,会造成注册中心数据冗余,网络通信压力增大,减少注册中心的吞吐量。
Dubbo 计划在 2.7.5 实现服务自省的功能,而 Spring Cloud alibaba-2.1.0 则已经完成了服务的自省。
摘自:
RPC 编程:https://www.ibm.com/developerworks/cn/aix/library/au-rpc_programming/
dubbo源码分析:https://segmentfault.com/u/crazyhzm/articles?page=3 -- 我使用这个
https://www.cnblogs.com/binarylei/p/11665128.html