vlambda博客
学习文章列表

Nacos源码-动态获取配置信息

This browser does not support music or audio playback. Please play it in WeChat or another browser.

很喜欢这首歌~ 分享给大家



前言

在一篇文章【Nacos入门一】介绍Nacos宏观内容,上一篇是小白入门篇,是什么,干什么,同类技术,同类技术比较,hello world程序 也可以运行。


看完上一篇博客大家有没有思考一个问题?为什么Nacos可以做到,他运用什么原理? 如何做到动态刷新配置文件, 我们知道对于中间件在获取数据的时候会拉取数据或者推送数据,那对于Nacos是如何呢?这一篇从源码我们来观察~


准备

https://nacos.io/en-us/docs/quick-start.html

https://github.com/alibaba/nacos


入口

下图是调试代码的入口


步骤

1从createConfigService出发

根据Properties属性创建对象ConfigService configService = NacosFactory.createConfigService(properties);

看ConfigFactory类有两个方法,第一个方法也就是我们原本第一步调用的方法,第二方法是只需要传送serverAddr就可以,其他的使用默认就可以。所以对于Nacos来说,如果其他配置如果不配则采用默认配置(从源码看会更加理解,不过根据任何涉及规则都是有默认值得~)


既然这个类最终调用的是createConfigService那我们来查看这个方法做了什么? 

利用反射创建实例,并且初始化构造函数,下面的代码里我有做针对性备注,初始化的是NacosConfigService类,那接下来我们看看NacosConfigService初始化做了什么呢?

public class ConfigFactory {    public static ConfigService createConfigService(Properties properties) throws NacosException { try { Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService"); //1 初始化NacosConfigService构造函数 Constructor constructor = driverImplClass.getConstructor(Properties.class); // 通过反射创建ConfigService ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties); return vendorImpl; } catch (Throwable e) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); }    }    public static ConfigService createConfigService(String serverAddr) throws NacosException { Properties properties = new Properties(); properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr); return createConfigService(properties);    }}


首先我们需要思考的是为什么要初始化这个类,他主要是干什么的呢?


                                               类的整体结构


这两个类非常重要,在下面的时候会调用

    private HttpAgent agent;      private ClientWorker worker;  


初始化方法都做了什么?

 /*** * 为什么要初始化这个函数?它主要是干嘛的? * @param properties * @throws NacosException */ public NacosConfigService(Properties properties) throws NacosException { // 2 构造函数被初始化 String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); if (StringUtils.isBlank(encodeTmp)) { encode = Constants.ENCODE; } else { encode = encodeTmp.trim(); } // 3 获取 namespace initNamespace(properties); // 4 创建 HttpAgent new ServerHttpAgent(properties) 里面获取的是serverAddr,会对这个字段进行处理,因为毕竟是集群形式 agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); agent.start(); // 5 创建 ClientWorker对象 ,参数里面是httpAgent,配置过滤器链管理 worker = new ClientWorker(agent, configFilterChainManager, properties); }


4 首先看一下HttpAgent的作用是什么?

public interface HttpAgent { void start() throws NacosException; HttpResult httpGet(String path, List<String> headers, List<String> paramValues, String encoding, long readTimeoutMs) throws IOException;  HttpResult httpPost() 。。。  HttpResult httpDelete() 。。。  String getName()  String getNamespace();  String getTenant();   String getEncode()}

5  其次看ClientWorker的作用是什么?它有3个线程,使用了不同的线程池,备注了我已经备注,最核心的是第三个线程。并且clientWorker使用到了agent,也就是调取得方法时指定的线程名

 public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager;
// Initialize the timeout parameter
init(properties); // 第一个线程 线程的名字work executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); // 第二个线程 线程数是虚拟机可用的最大处理器数,是可变的。 executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); // 第三个线程根据间隔时间来计算下个任务开始的时间 executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { // 第四个 checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); }

6 checkConfigInfo

  //使用的是AtomicReference 保障了原子性,预防了ABA问题 private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>( new HashMap<String, CacheData>()); // 这块是重点 主要是提取一批任务,交给executorService线程执行,执行任务是LongPollingRunnable的taskId public void checkConfigInfo() { // 分任务 int listenerSize = cacheMap.get().size(); // 向上取整为批数 int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // 要判断任务是否在执行 这块需要好好想想。任务列表现在是无序的。变化过程可能有问题 // LongPollingRunnable 主要执行了什么任务呢? executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }

7 LongllingRunnable

 class LongPollingRunnable implements Runnable { private int taskId;
public LongPollingRunnable(int taskId) { this.taskId = taskId; }
@Override public void run(){ ...... //代码太多,只看具体代码 1 check server config List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);           2 拉取数据 String content = getServerConfig(dataId, group, tenant, 3000L);

8 getServerConfig是干嘛的呢?

 public String getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException { if (StringUtils.isBlank(group)) { group = Constants.DEFAULT_GROUP;        } HttpResult result = null; try { List<String> params = null; if (StringUtils.isBlank(tenant)) { params = Arrays.asList("dataId", dataId, "group", group); } else { params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant); }            //在这里我们就发现了agent的作用是什么。 result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);  }


未完待续

先分享到这,后续还有listener,本来这篇文章想分享Nacos的注册发现,但是感觉config这块理解不是很深入,所以撸了源码瞅瞅,如果和你的认知有差别,欢迎一起交流成长~