Nacos源码-动态获取配置信息
很喜欢这首歌~ 分享给大家
前言
在一篇文章【Nacos入门一】介绍Nacos宏观内容,上一篇是小白入门篇,是什么,干什么,同类技术,同类技术比较,hello world程序 也可以运行。
看完上一篇博客大家有没有思考一个问题?为什么Nacos可以做到,他运用什么原理? 如何做到动态刷新配置文件, 我们知道对于中间件在获取数据的时候会拉取数据或者推送数据,那对于Nacos是如何呢?这一篇从源码我们来观察~
准备
https://nacos.io/en-us/docs/quick-start.html
https://github.com/alibaba/nacos
入口
下图是调试代码的入口
步骤
1从createConfigService出发
根据Properties属性创建对象
ConfigService configService = NacosFactory.createConfigService(properties);
2 看ConfigFactory类有两个方法,第一个方法也就是我们原本第一步调用的方法,第二方法是只需要传送serverAddr就可以,其他的使用默认就可以。所以对于Nacos来说,如果其他配置如果不配则采用默认配置(从源码看会更加理解,不过根据任何涉及规则都是有默认值得~)
既然这个类最终调用的是createConfigService那我们来查看这个方法做了什么?
利用反射创建实例,并且初始化构造函数,下面的代码里我有做针对性备注,初始化的是NacosConfigService类,那接下来我们看看NacosConfigService初始化做了什么呢?
public class ConfigFactory {
public static ConfigService createConfigService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
//1 初始化NacosConfigService构造函数
Constructor constructor = driverImplClass.getConstructor(Properties.class);
// 通过反射创建ConfigService
ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
public static ConfigService createConfigService(String serverAddr) throws NacosException {
Properties properties = new Properties();
properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
return createConfigService(properties);
}
}
3 首先我们需要思考的是为什么要初始化这个类,他主要是干什么的呢?
类的整体结构
这两个类非常重要,在下面的时候会调用
private HttpAgent agent;
private ClientWorker worker;
初始化方法都做了什么?
/***
* 为什么要初始化这个函数?它主要是干嘛的?
* @param properties
* @throws NacosException
*/
public NacosConfigService(Properties properties) throws NacosException {
// 2 构造函数被初始化
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
encode = Constants.ENCODE;
} else {
encode = encodeTmp.trim();
}
// 3 获取 namespace
initNamespace(properties);
// 4 创建 HttpAgent new ServerHttpAgent(properties) 里面获取的是serverAddr,会对这个字段进行处理,因为毕竟是集群形式
agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
agent.start();
// 5 创建 ClientWorker对象 ,参数里面是httpAgent,配置过滤器链管理
worker = new ClientWorker(agent, configFilterChainManager, properties);
}
4 首先看一下HttpAgent的作用是什么?
public interface HttpAgent {
void start() throws NacosException;
HttpResult httpGet(String path, List<String> headers, List<String> paramValues, String encoding, long readTimeoutMs) throws IOException;
HttpResult httpPost() 。。。
HttpResult httpDelete() 。。。
String getName()
String getNamespace();
String getTenant();
String getEncode();
}
5 其次看ClientWorker的作用是什么?它有3个线程,使用了不同的线程池,备注了我已经备注,最核心的是第三个线程。并且clientWorker使用到了agent,也就是调取得方法时指定的线程名
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
// Initialize the timeout parameter
init(properties);
// 第一个线程 线程的名字work
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
// 第二个线程 线程数是虚拟机可用的最大处理器数,是可变的。
executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
// 第三个线程根据间隔时间来计算下个任务开始的时间
executor.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
// 第四个
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
6 checkConfigInfo
//使用的是AtomicReference 保障了原子性,预防了ABA问题
private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(
new HashMap<String, CacheData>());
// 这块是重点 主要是提取一批任务,交给executorService线程执行,执行任务是LongPollingRunnable的taskId
public void checkConfigInfo() {
// 分任务
int listenerSize = cacheMap.get().size();
// 向上取整为批数
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
// 要判断任务是否在执行 这块需要好好想想。任务列表现在是无序的。变化过程可能有问题
// LongPollingRunnable 主要执行了什么任务呢?
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
7 LongllingRunnable
class LongPollingRunnable implements Runnable {
private int taskId;
public LongPollingRunnable(int taskId) {
this.taskId = taskId;
}
public void run()
{
......
//代码太多,只看具体代码
1 check server config
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
2 拉取数据
String content = getServerConfig(dataId, group, tenant, 3000L);
8 getServerConfig是干嘛的呢?
public String getServerConfig(String dataId, String group, String tenant, long readTimeout)
throws NacosException {
if (StringUtils.isBlank(group)) {
group = Constants.DEFAULT_GROUP;
}
HttpResult result = null;
try {
List<String> params = null;
if (StringUtils.isBlank(tenant)) {
params = Arrays.asList("dataId", dataId, "group", group);
} else {
params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant);
}
//在这里我们就发现了agent的作用是什么。
result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
}
未完待续
先分享到这,后续还有listener,本来这篇文章想分享Nacos的注册发现,但是感觉config这块理解不是很深入,所以撸了源码瞅瞅,如果和你的认知有差别,欢迎一起交流成长~