vlambda博客
学习文章列表

Nacos源码(四)1.4.1配置中心-Derby集群模式

前言

Nacos源码(四)1.4.1配置中心-Derby集群模式
NacosCluster.png

本章讨论Nacos配置中心,以集群方式启动(-Dnacos.standalone=false),使用嵌入式数据源derby( -DembeddedStorage=true),由于每个节点有一个独立的数据源,通过JRaft框架实现数据一致性。其中包括写一致性和读一致性(线性一致)。

一、写一致

1、与使用MySQL的区别

回顾POST /v1/cs/configs发布配置,基于MySQL数据源,这个接口总共做了几个事情:

  • 更新服务端配置(数据库)

  • 集群中所有服务端更新本地配置

  • 响应客户端长轮询

// ConfigController
@PostMapping
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
        @RequestParam(value = "dataId")
 String dataId, @RequestParam(value = "group") String group,...) throws NacosException 
{

    // ...
    ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
    configInfo.setType(type);
    if (StringUtils.isBlank(betaIps)) {
        if (StringUtils.isBlank(tag)) {
            // 更新数据库配置
            persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
            // 发布ConfigDataChangeEvent事件
            ConfigChangePublisher
                    .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
        } else {
            // ...
        }
    } else {
        // ...
    }
    return true;
}

集群基于Derby嵌入数据源,发布配置页做了类似的事情,只不过方式不同。

  • Derby在每个节点各存储了一份数据,写入要保证每个节点的数据一致。

  • ConfigChangePublisher.notifyConfigChange方法,对于集群+derby启动的方式不做任何处理,需要以其他方式把derby中的数据同步到文件系统和内存中,并响应长轮询。

public class ConfigChangePublisher {
    public static void notifyConfigChange(ConfigDataChangeEvent event) {
        // 集群启动(-Dnacos.standalone=false),嵌入式数据源( -DembeddedStorage=true),不处理
        if (PropertyUtil.isEmbeddedStorage() && !EnvUtil.getStandaloneMode()) {
            return;
        }
        NotifyCenter.publishEvent(event);
    }
}

2、提交Task

使用sofa-jraft框架的一致性写要构造一个Task提交到Leader的NodeImpl(Node.apply(Task)),这里看看Nacos如何实现。

Nacos源码(四)1.4.1配置中心-Derby集群模式
Write.png

以更新配置为例,定位到PersistService的derby实现类EmbeddedStoragePersistServiceImpl,updateConfigInfo方法负责更新配置。

// **EmbeddedStoragePersistServiceImpl**
@Override
public void updateConfigInfo(final ConfigInfo configInfo, final String srcIp, final String srcUser,
        final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify)
 
{
    try {
        // 1. SELECT ID,data_id,group_id,tenant_id,app_name,content,md5,type FROM config_info
        //  WHERE data_id=? AND group_id=? AND tenant_id=?
        ConfigInfo oldConfigInfo = findConfigInfo(configInfo.getDataId(), configInfo.getGroup(),
                configInfo.getTenant());
        // ...

        // 2. UPDATE config_info SET content=?, md5 = ?, src_ip=?,src_user=?,gmt_modified=?,app_name=?, c_desc=?,c_use=?,effect=?,type=?,c_schema=?
        //  WHERE data_id=? AND group_id=? AND tenant_id=?
        updateConfigInfoAtomic(configInfo, srcIp, srcUser, time, configAdvanceInfo);

        // ... 忽略tag相关

        // 3. INSERT INTO his_config_info (id,data_id,group_id,tenant_id,app_name,content,md5,src_ip,src_user,gmt_modified,op_type) VALUES(?,?,?,?,?,?,?,?,?,?,?)
        insertConfigHistoryAtomic(oldConfigInfo.getId(), oldConfigInfo, srcIp, srcUser, time, "U");

        // 4. 将ConfigDumpEvent作为extendInfo放入ThreadLocal
        EmbeddedStorageContextUtils.onModifyConfigInfo(configInfo, srcIp, time);
        databaseOperate.blockUpdate();
    } finally {
        EmbeddedStorageContextUtils.cleanAllContext();
    }
}

随便跟进一个*Atomic方法,仅仅是通过EmbeddedStorageContextUtils.addSqlContext(sql, args)将sql和参数存储下来。EmbeddedStorageContextUtils将sql和args封装为一个ModifyRequest放入了ThreadLocal中,没有实际执行sql。

// **EmbeddedStoragePersistServiceImpl**
@Override
public void updateConfigInfoAtomic(final ConfigInfo configInfo, final String srcIp, final String srcUser,
        final Timestamp time, Map<String, Object> configAdvanceInfo)
 
{
    // ... 忽略参数处理
    final String sql = "UPDATE config_info SET content=?, md5 = ?, src_ip=?,src_user=?,gmt_modified=?,app_name=?,"
            + "c_desc=?,c_use=?,effect=?,type=?,c_schema=? WHERE data_id=? AND group_id=? AND tenant_id=?";

    final Object[] args = new Object[] {configInfo.getContent(), md5Tmp, srcIp, srcUser, time, appNameTmp, desc,
            use, effect, type, schema, configInfo.getDataId(), configInfo.getGroup(), tenantTmp};
    EmbeddedStorageContextUtils.addSqlContext(sql, args);
}

// EmbeddedStorageContextUtils
private static final ThreadLocal<ArrayList<ModifyRequest>> SQL_CONTEXT = ThreadLocal.withInitial(ArrayList::new);

public static void addSqlContext(String sql, Object... args) {
    ArrayList<ModifyRequest> requests = SQL_CONTEXT.get();
    ModifyRequest context = new ModifyRequest();
    context.setExecuteNo(requests.size());
    context.setSql(sql);
    context.setArgs(args);
    requests.add(context);
    SQL_CONTEXT.set(requests);
}

当上述所有sql保存到ThreadLocal中之后,EmbeddedStorageContextUtils.onModifyConfigInfo(configInfo, srcIp, time)将配置信息,封装为ConfigDumpEvent,作为扩展信息,放入了另外一个ThreadLocal中。

// EmbeddedStorageContextUtils

private static final ThreadLocal<Map<String, String>> EXTEND_INFO_CONTEXT = ThreadLocal.withInitial(HashMap::new);

public static void onModifyConfigInfo(ConfigInfo configInfo, String srcIp, Timestamp time) {
    if (!EnvUtil.getStandaloneMode()) {
        ConfigDumpEvent event = ConfigDumpEvent.builder().remove(false).namespaceId(configInfo.getTenant())
                .dataId(configInfo.getDataId()).group(configInfo.getGroup()).isBeta(false)
                .content(configInfo.getContent()).type(configInfo.getType()).handleIp(srcIp)
                .lastModifiedTs(time.getTime()).build();

        Map<String, String> extendInfo = new HashMap<>(2);
        extendInfo.put(Constants.EXTEND_INFO_CONFIG_DUMP_EVENT, JacksonUtils.toJson(event));
        EmbeddedStorageContextUtils.putAllExtendInfo(extendInfo);
    }
}

public static void putAllExtendInfo(Map<String, String> map) {
    Map<String, String> old = EXTEND_INFO_CONTEXT.get();
    old.putAll(map);
    EXTEND_INFO_CONTEXT.set(old);
}

至此,所有必须数据准备完毕,通过DatabaseOperate.blockUpdate执行更新。这里DatabaseOperate的实现类是DistributedDatabaseOperateImpl。将ThreadLocal中的ModifyRequest(sql和args)和ConfigDumpEvent都封装到WriteRequest中。

// DistributedDatabaseOperateImpl
@Override
public Boolean update(List<ModifyRequest> sqlContext, BiConsumer<Boolean, Throwable> consumer) {
    try {
        // {timestamp}-{group}-{ip:port}-{signature}
        final String key =
                System.currentTimeMillis() + "-" + group() + "-" + memberManager.getSelf().getAddress() + "-"
                        + MD5Utils.md5Hex(sqlContext.toString(), Constants.ENCODE);
        WriteRequest request = WriteRequest.newBuilder().setGroup(group()).setKey(key)
                // List<ModifyRequest>
                .setData(ByteString.copyFrom(serializer.serialize(sqlContext)))
                // 这里可能有ConfigDumpEvent
                .putAllExtendInfo(EmbeddedStorageContextUtils.getCurrentExtendInfo())
                .setType(sqlContext.getClass().getCanonicalName()).build();
        // consumer is null 代表同步请求
        if (Objects.isNull(consumer)) {
            // JRaftProtocol.write
            Response response = this.protocol.write(request);
            if (response.getSuccess()) {
                return true;
            }
            return false;
        } else {
           // ... 异步请求忽略
        }
        return true;
    } catch (TimeoutException e) {
        throw new NacosRuntimeException(NacosException.SERVER_ERROR, e.toString());
    } catch (Throwable e) {
        throw new NacosRuntimeException(NacosException.SERVER_ERROR, e.toString());
    }
}

protocal.write的实现类是JRaftProtocol。一般同步请求,都是适配异步请求方法,这里超时时间是10s。

// JRaftProtocol
@Override
public Response write(WriteRequest request) throws Exception {
    CompletableFuture<Response> future = writeAsync(request);
    return future.get(10_000L, TimeUnit.MILLISECONDS);
}

@Override
public CompletableFuture<Response> writeAsync(WriteRequest request) {
    return raftServer.commit(request.getGroup(), request, new CompletableFuture<>());
}

JRaftServer的commit方法,区分当前节点是leader还是follower。

// JRaftServer
public CompletableFuture<Response> commit(final String group, final Message data,
        final CompletableFuture<Response> future)
 
{
    final RaftGroupTuple tuple = findTupleByGroup(group);
    FailoverClosureImpl closure = new FailoverClosureImpl(future);
    // 如果当前节点是leader,优先尝试提交task到
    final Node node = tuple.node;
    if (node.isLeader()) {
        // 如果当前节点是leader,直接执行
        applyOperation(node, data, closure);
    } else {
        // 如果是follower,重定向到leader
        invokeToLeader(group, data, rpcRequestTimeoutMs, closure);
    }
    return future;
}

如果是follower节点,这里会重定向到leader节点,由leader节点执行写入操作。

// JRaftServer
private void invokeToLeader(final String group, final Message request, final int timeoutMillis,
        FailoverClosure closure)
 
{
    try {
        final Endpoint leaderIp = Optional.ofNullable(getLeader(group))
                .orElseThrow(() -> new NoLeaderException(group)).getEndpoint();
        cliClientService.getRpcClient().invokeAsync(leaderIp, request, new InvokeCallback() {
            @Override
            public void complete(Object o, Throwable ex) {
                if (Objects.nonNull(ex)) {
                    closure.setThrowable(ex);
                    closure.run(new Status(RaftError.UNKNOWN, ex.getMessage()));
                    return;
                }
                closure.setResponse((Response) o);
                closure.run(Status.OK());
            }

            @Override
            public Executor executor() {
                return RaftExecutor.getRaftCliServiceExecutor();
            }
        }, timeoutMillis);
    } catch (Exception e) {
        closure.setThrowable(e);
        closure.run(new Status(RaftError.UNKNOWN, e.toString()));
    }
}

如果当前是leader节点,那么不用经过rpc调用,可以直接执行写入操作。(follower重定向过来也是由同样的方法处理)可以看到,这里使用sofa-jraft的Node.apply(Task)方法提交本次写入请求。

// JRaftServer
public void applyOperation(Node node, Message data, FailoverClosure closure) {
    final Task task = new Task();
    task.setDone(new NacosClosure(data, status -> {
        NacosClosure.NacosStatus nacosStatus = (NacosClosure.NacosStatus) status;
        closure.setThrowable(nacosStatus.getThrowable());
        closure.setResponse(nacosStatus.getResponse());
        closure.run(nacosStatus);
    }));
    task.setData(ByteBuffer.wrap(data.toByteArray()));
    node.apply(task);
}

3、应用Log

将Task提交到sofa-jraft框架后,框架会处理所有流程(日志复制、超半数提交),最终会调用用户实现的状态机的onApply方法,此时收到的log已经被过半节点提交,可以应用到所有节点的本地状态机中。

NacosStateMachine是Nacos对于sofa-jraft StateMachine的实现类,定位到onApply方法。

protected final RequestProcessor processor;
// commit log之后触发,应用log到当前节点
@Override
public void onApply(Iterator iter) {
    int index = 0;
    int applied = 0;
    Message message;
    NacosClosure closure = null;
    try {
        while (iter.hasNext()) {
            Status status = Status.OK();
            try {
                // 如果是leader节点,这里done不为空,减少反序列化报文的开销
                if (iter.done() != null) {
                    closure = (NacosClosure) iter.done();
                    message = closure.getMessage();
                } else {
                    final ByteBuffer data = iter.getData();
                    message = ProtoMessageUtil.parse(data.array());
                }
                // 写请求
                if (message instanceof WriteRequest) {
                    Response response = processor.onApply((WriteRequest) message);
                    postProcessor(response, closure);
                }
                // 一致性读降级走raft流程
                if (message instanceof ReadRequest) {
                    Response response = processor.onRequest((ReadRequest) message);
                    postProcessor(response, closure);
                }
            } catch (Throwable e) {
                index++;
                status.setError(RaftError.UNKNOWN, e.toString());
                Optional.ofNullable(closure).ifPresent(closure1 -> closure1.setThrowable(e));
                throw e;
            } finally {
                Optional.ofNullable(closure).ifPresent(closure1 -> closure1.run(status));
            }
            applied++;
            index++;
            iter.next();
        }
    } catch (Throwable t) {
        iter.setErrorAndRollback(index - applied,
                new Status(RaftError.ESTATEMACHINE, "StateMachine meet critical error: %s.",
                        ExceptionUtil.getStackTrace(t)));
    }
}

最终WriteRequest被提交到一个RequestProcessor处理,这个实现类仍然是DistributedDatabaseOperateImpl。这里将所有sql放到derby数据库执行,并从WriteRequest的扩展信息中获取ConfigDumpEvent并发布。

public class DistributedDatabaseOperateImpl extends RequestProcessor4CP implements BaseDatabaseOperate {
  // raft集群master节点已经commit,这里apply log,将数据落库,提交DumpEvent
    @Override
    public Response onApply(WriteRequest log) {
        final ByteString byteString = log.getData();
        List<ModifyRequest> sqlContext = serializer.deserialize(byteString.toByteArray(), List.class);
        final Lock lock = readLock;
        lock.lock();
        try {
            boolean isOk = false;
            if (log.containsExtendInfo(DATA_IMPORT_KEY)) {
                isOk = doDataImport(jdbcTemplate, sqlContext);
            } else {
                sqlContext.sort(Comparator.comparingInt(ModifyRequest::getExecuteNo));
                // 1. 落库derby
                isOk = update(transactionTemplate, jdbcTemplate, sqlContext);
                // 2. DumpEvent
                ConfigExecutor.executeEmbeddedDump(() -> handleExtendInfo(log.getExtendInfoMap()));
            }

            return Response.newBuilder().setSuccess(isOk).build();

        } catch (BadSqlGrammarException | DataIntegrityViolationException e) {
            return Response.newBuilder().setSuccess(false).setErrMsg(e.toString()).build();
        } catch (DataAccessException e) {
            throw new ConsistencyException(e.toString());
        } catch (Throwable t) {
            throw t;
        } finally {
            lock.unlock();
        }
    }
    // 从扩展信息中,反序列化ConfigDumpEvent,发布事件
    private void handleExtendInfo(Map<String, String> extendInfo) {
      if (extendInfo.containsKey(Constants.EXTEND_INFO_CONFIG_DUMP_EVENT)) {
        String jsonVal = extendInfo.get(Constants.EXTEND_INFO_CONFIG_DUMP_EVENT);
        if (StringUtils.isNotBlank(jsonVal)) {
          NotifyCenter.publishEvent(JacksonUtils.toObj(jsonVal, ConfigDumpEvent.class));
        }
        return;
      }
      // ...
    }
}

ConfigDumpEvent事件的处理,和之前使用MySQL数据源是一样的,会将Derby数据库里的配置dump到本地文件系统中,然后同步到内存配置的CacheItem中,最后响应客户端长轮询请求。下图是第二章的图,从/configs接口到ConfigDumpEvent之间的逻辑与非derby集群有所不同。

Nacos源码(四)1.4.1配置中心-Derby集群模式
发布配置主流程.png

二、线性一致读

集群Derby启动,不会直接读Derby数据库,而是会读本地文件系统。比如下面是GET /configs查询配置接口的逻辑。

// ConfigServletInner
public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group,
        String tenant, String tag, String clientIp)
 throws IOException, ServletException 
{
    // ...
    // 如果单机部署且使用derby数据源,查询实时配置
    if (PropertyUtil.isDirectRead()) {
      configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
    } else {
      // 如果集群部署 或 使用mysql,读取本地文件系统中的配置
      file = DiskUtil.targetFile(dataId, group, tenant);
    }
        // ...
}

所以这里的线性一致读主要存在于Dump阶段,将数据库中的数据Dump到本地文件系统。比如配置中心启动之后,需要将Derby数据库里的数据Dump到本地文件系统,供之后查询使用。

PersistService实现类EmbeddedStoragePersistServiceImpl,queryConfigInfo查询配置方法如下。

@Override
public ConfigInfoWrapper queryConfigInfo(final String dataId, final String group, final String tenant) {
    String tenantTmp = StringUtils.isBlank(tenant) ? StringUtils.EMPTY : tenant;
    final String sql = "SELECT ID,data_id,group_id,tenant_id,app_name,content,type,gmt_modified,md5 FROM config_info WHERE data_id=? AND group_id=? AND tenant_id=?";

    return databaseOperate.queryOne(sql, new Object[] {dataId, group, tenantTmp}, CONFIG_INFO_WRAPPER_ROW_MAPPER);
}

由于是集群启动,进入DistributedDatabaseOperateImpl#queryOne方法。

@Override
public <R> queryOne(String sql, Object[] args, RowMapper<R> mapper) {
    try {
        byte[] data = serializer.serialize(
                SelectRequest.builder().queryType(QueryType.QUERY_ONE_WITH_MAPPER_WITH_ARGS).sql(sql).args(args)
                        .className(mapper.getClass().getCanonicalName()).build());

        final boolean blockRead = EmbeddedStorageContextUtils
                .containsExtendInfo(Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);

        Response response = innerRead(
                ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(data)).build(), blockRead);
        if (response.getSuccess()) {
            return serializer.deserialize(response.getData().toByteArray(),
                    ClassUtils.resolveGenericTypeByInterface(mapper.getClass()));
        }
        throw new NJdbcException(response.getErrMsg(), response.getErrMsg());
    } catch (Exception e) {
        throw new NacosRuntimeException(NacosException.SERVER_ERROR, e.toString());
    }
}

private Response innerRead(ReadRequest request, boolean blockRead) throws Exception {
    if (blockRead) {
      return (Response) protocol.aGetData(request).join();
    }
    return protocol.getData(request);
}

最后还是进入JRaftProtocol处理ReadRequest。

// JRaftProtocol
@Override
public CompletableFuture<Response> aGetData(ReadRequest request) {
    return raftServer.get(request);
}

JRaftServer的get方法,处理ReadRequest。这里调用sofa-jraft的Node.readIndex方法处理一致性读,如果处理失败,会降级走一遍raft流程(过半提交,应用log,和写流程一样)。

// JRaftServer
CompletableFuture<Response> get(final ReadRequest request) {
    final String group = request.getGroup();
    CompletableFuture<Response> future = new CompletableFuture<>();
    final RaftGroupTuple tuple = findTupleByGroup(group);
    final Node node = tuple.node;
    final RequestProcessor processor = tuple.processor;
    // 1. 优先采用ReadIndex方式进行一致性读
    try {
        node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
            @Override
            public void run(Status status, long index, byte[] reqCtx) {
                // 2. 经过JRaft处理完成后,此处收到回调,响应客户端
                if (status.isOk()) {
                    try {
                        Response response = processor.onRequest(request);
                        future.complete(response);
                    } catch (Throwable t) {
                        future.completeExceptionally(new ConsistencyException());
                    }
                    return;
                }
                // ...
            }
        });
        return future;
    } catch (Throwable e) {
        // 3. 各种RPC调用发生异常,走raft流程,忽略
        readFromLeader(request, future);
        return future;
    }
}

当sofa-jraft框架处理后,发现readIndex<=applyIndex时,ReadIndexClosure收到回调,此时可以读取本地状态机中的数据,即当前derby数据源中的数据。

// DistributedDatabaseOperateImpl
public Response onRequest(final ReadRequest request) {
    final SelectRequest selectRequest = serializer
            .deserialize(request.getData().toByteArray(), SelectRequest.class);
    final RowMapper<Object> mapper = RowMapperManager.getRowMapper(selectRequest.getClassName());
    final byte type = selectRequest.getQueryType();
    readLock.lock();
    Object data;
    try {
        switch (type) {
            case QueryType.QUERY_ONE_WITH_MAPPER_WITH_ARGS:
                data = queryOne(jdbcTemplate, selectRequest.getSql(), selectRequest.getArgs(), mapper);
                break;
            case QueryType.QUERY_ONE_NO_MAPPER_NO_ARGS:
                data = queryOne(jdbcTemplate, selectRequest.getSql(),
                        ClassUtils.findClassByName(selectRequest.getClassName()));
                break;
            // ...
            default:
                throw new IllegalArgumentException("Unsupported data query categories");
        }
        ByteString bytes = data == null ? ByteString.EMPTY : ByteString.copyFrom(serializer.serialize(data));
        return Response.newBuilder().setSuccess(true).setData(bytes).build();
    } catch (Exception e) {
        return Response.newBuilder().setSuccess(false)
                .setErrMsg(...).build();
    } finally {
        readLock.unlock();
    }
}

总结

背景:Nacos配置中心集群启动,使用Derby嵌入数据源。

  • 写一致

    • 将sql和args封装为一个ModifyRequest放入ThreadLocal

    • ConfigDumpEvent放入ThreadLocal

    • 使用ThreadLocal中的参数,构建WriteRequest,调用sofa-jraft的Node.apply(Task)方法提交写请求到Raft集群

    • 当超半数节点commit log成功,所有节点(无论是leader发现超半数commit,还是follower通过日志复制)的NacosStateMachine的onApply方法都会被调用,此时将会执行ModifyRequest中的sql,并发布request中的ConfigDumpEvent。

  • 线性一致读

    • 场景:将derby数据库中的数据dump到本地文件系统

    • JRaftServer会调用sofa-jraft的Node.readIndex方法,执行一致性读;如果一致性读失败,降级走raft流程,与写流程一样,实现线性一致性。

    • sofa-jraft框架发现readIndex<=applyIndex时,ReadIndexClosure收到回调,此时可以读取本地状态机中的数据,即当前derby数据源中的数据。