Nacos源码-动态获取配置信息
很喜欢这首歌~ 分享给大家
前言
在一篇文章【Nacos入门一】介绍Nacos宏观内容,上一篇是小白入门篇,是什么,干什么,同类技术,同类技术比较,hello world程序 也可以运行。
看完上一篇博客大家有没有思考一个问题?为什么Nacos可以做到,他运用什么原理? 如何做到动态刷新配置文件, 我们知道对于中间件在获取数据的时候会拉取数据或者推送数据,那对于Nacos是如何呢?这一篇从源码我们来观察~
准备
https://nacos.io/en-us/docs/quick-start.html
https://github.com/alibaba/nacos
入口
下图是调试代码的入口
步骤
1从createConfigService出发
根据Properties属性创建对象ConfigService configService = NacosFactory.createConfigService(properties);
2 看ConfigFactory类有两个方法,第一个方法也就是我们原本第一步调用的方法,第二方法是只需要传送serverAddr就可以,其他的使用默认就可以。所以对于Nacos来说,如果其他配置如果不配则采用默认配置(从源码看会更加理解,不过根据任何涉及规则都是有默认值得~)
既然这个类最终调用的是createConfigService那我们来查看这个方法做了什么?
利用反射创建实例,并且初始化构造函数,下面的代码里我有做针对性备注,初始化的是NacosConfigService类,那接下来我们看看NacosConfigService初始化做了什么呢?
public class ConfigFactory {public static ConfigService createConfigService(Properties properties) throws NacosException {try {Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");//1 初始化NacosConfigService构造函数Constructor constructor = driverImplClass.getConstructor(Properties.class);// 通过反射创建ConfigServiceConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);return vendorImpl;} catch (Throwable e) {throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);}}public static ConfigService createConfigService(String serverAddr) throws NacosException {Properties properties = new Properties();properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);return createConfigService(properties);}}
3 首先我们需要思考的是为什么要初始化这个类,他主要是干什么的呢?
类的整体结构
这两个类非常重要,在下面的时候会调用
private HttpAgent agent;private ClientWorker worker;
初始化方法都做了什么?
/**** 为什么要初始化这个函数?它主要是干嘛的?* @param properties* @throws NacosException*/public NacosConfigService(Properties properties) throws NacosException {// 2 构造函数被初始化String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);if (StringUtils.isBlank(encodeTmp)) {encode = Constants.ENCODE;} else {encode = encodeTmp.trim();}// 3 获取 namespaceinitNamespace(properties);// 4 创建 HttpAgent new ServerHttpAgent(properties) 里面获取的是serverAddr,会对这个字段进行处理,因为毕竟是集群形式agent = new MetricsHttpAgent(new ServerHttpAgent(properties));agent.start();// 5 创建 ClientWorker对象 ,参数里面是httpAgent,配置过滤器链管理worker = new ClientWorker(agent, configFilterChainManager, properties);}
4 首先看一下HttpAgent的作用是什么?
public interface HttpAgent {void start() throws NacosException;HttpResult httpGet(String path, List<String> headers, List<String> paramValues, String encoding, long readTimeoutMs) throws IOException;HttpResult httpPost() 。。。HttpResult httpDelete() 。。。String getName()String getNamespace();String getTenant();String getEncode();}
5 其次看ClientWorker的作用是什么?它有3个线程,使用了不同的线程池,备注了我已经备注,最核心的是第三个线程。并且clientWorker使用到了agent,也就是调取得方法时指定的线程名
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {this.agent = agent;this.configFilterChainManager = configFilterChainManager;// Initialize the timeout parameterinit(properties);// 第一个线程 线程的名字workexecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {public Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker." + agent.getName());t.setDaemon(true);return t;}});// 第二个线程 线程数是虚拟机可用的最大处理器数,是可变的。executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {public Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());t.setDaemon(true);return t;}});// 第三个线程根据间隔时间来计算下个任务开始的时间executor.scheduleWithFixedDelay(new Runnable() {public void run() {try {// 第四个checkConfigInfo();} catch (Throwable e) {LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);}}}, 1L, 10L, TimeUnit.MILLISECONDS);}
6 checkConfigInfo
//使用的是AtomicReference 保障了原子性,预防了ABA问题private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(new HashMap<String, CacheData>());// 这块是重点 主要是提取一批任务,交给executorService线程执行,执行任务是LongPollingRunnable的taskIdpublic void checkConfigInfo() {// 分任务int listenerSize = cacheMap.get().size();// 向上取整为批数int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());if (longingTaskCount > currentLongingTaskCount) {for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {// 要判断任务是否在执行 这块需要好好想想。任务列表现在是无序的。变化过程可能有问题// LongPollingRunnable 主要执行了什么任务呢?executorService.execute(new LongPollingRunnable(i));}currentLongingTaskCount = longingTaskCount;}}
7 LongllingRunnable
class LongPollingRunnable implements Runnable {private int taskId;public LongPollingRunnable(int taskId) {this.taskId = taskId;}public void run(){......//代码太多,只看具体代码1 check server configList<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);2 拉取数据String content = getServerConfig(dataId, group, tenant, 3000L);
8 getServerConfig是干嘛的呢?
public String getServerConfig(String dataId, String group, String tenant, long readTimeout)throws NacosException {if (StringUtils.isBlank(group)) {group = Constants.DEFAULT_GROUP;}HttpResult result = null;try {List<String> params = null;if (StringUtils.isBlank(tenant)) {params = Arrays.asList("dataId", dataId, "group", group);} else {params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant);}//在这里我们就发现了agent的作用是什么。result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);}
未完待续
先分享到这,后续还有listener,本来这篇文章想分享Nacos的注册发现,但是感觉config这块理解不是很深入,所以撸了源码瞅瞅,如果和你的认知有差别,欢迎一起交流成长~
