Nacos源码(四)1.4.1配置中心-Derby集群模式
前言
本章讨论Nacos配置中心,以集群方式启动(-Dnacos.standalone=false),使用嵌入式数据源derby( -DembeddedStorage=true),由于每个节点有一个独立的数据源,通过JRaft框架实现数据一致性。其中包括写一致性和读一致性(线性一致)。
一、写一致
1、与使用MySQL的区别
回顾POST /v1/cs/configs发布配置,基于MySQL数据源,这个接口总共做了几个事情:
更新服务端配置(数据库)
集群中所有服务端更新本地配置
响应客户端长轮询
// ConfigController
@PostMapping
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,...) throws NacosException {
// ...
ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
configInfo.setType(type);
if (StringUtils.isBlank(betaIps)) {
if (StringUtils.isBlank(tag)) {
// 更新数据库配置
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
// 发布ConfigDataChangeEvent事件
ConfigChangePublisher
.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
} else {
// ...
}
} else {
// ...
}
return true;
}
集群基于Derby嵌入数据源,发布配置页做了类似的事情,只不过方式不同。
Derby在每个节点各存储了一份数据,写入要保证每个节点的数据一致。
ConfigChangePublisher.notifyConfigChange方法,对于集群+derby启动的方式不做任何处理,需要以其他方式把derby中的数据同步到文件系统和内存中,并响应长轮询。
public class ConfigChangePublisher {
public static void notifyConfigChange(ConfigDataChangeEvent event) {
// 集群启动(-Dnacos.standalone=false),嵌入式数据源( -DembeddedStorage=true),不处理
if (PropertyUtil.isEmbeddedStorage() && !EnvUtil.getStandaloneMode()) {
return;
}
NotifyCenter.publishEvent(event);
}
}
2、提交Task
使用sofa-jraft框架的一致性写要构造一个Task提交到Leader的NodeImpl(Node.apply(Task)),这里看看Nacos如何实现。
以更新配置为例,定位到PersistService的derby实现类EmbeddedStoragePersistServiceImpl,updateConfigInfo方法负责更新配置。
// **EmbeddedStoragePersistServiceImpl**
@Override
public void updateConfigInfo(final ConfigInfo configInfo, final String srcIp, final String srcUser,
final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {
try {
// 1. SELECT ID,data_id,group_id,tenant_id,app_name,content,md5,type FROM config_info
// WHERE data_id=? AND group_id=? AND tenant_id=?
ConfigInfo oldConfigInfo = findConfigInfo(configInfo.getDataId(), configInfo.getGroup(),
configInfo.getTenant());
// ...
// 2. UPDATE config_info SET content=?, md5 = ?, src_ip=?,src_user=?,gmt_modified=?,app_name=?, c_desc=?,c_use=?,effect=?,type=?,c_schema=?
// WHERE data_id=? AND group_id=? AND tenant_id=?
updateConfigInfoAtomic(configInfo, srcIp, srcUser, time, configAdvanceInfo);
// ... 忽略tag相关
// 3. INSERT INTO his_config_info (id,data_id,group_id,tenant_id,app_name,content,md5,src_ip,src_user,gmt_modified,op_type) VALUES(?,?,?,?,?,?,?,?,?,?,?)
insertConfigHistoryAtomic(oldConfigInfo.getId(), oldConfigInfo, srcIp, srcUser, time, "U");
// 4. 将ConfigDumpEvent作为extendInfo放入ThreadLocal
EmbeddedStorageContextUtils.onModifyConfigInfo(configInfo, srcIp, time);
databaseOperate.blockUpdate();
} finally {
EmbeddedStorageContextUtils.cleanAllContext();
}
}
随便跟进一个*Atomic方法,仅仅是通过EmbeddedStorageContextUtils.addSqlContext(sql, args)将sql和参数存储下来。EmbeddedStorageContextUtils将sql和args封装为一个ModifyRequest放入了ThreadLocal中,没有实际执行sql。
// **EmbeddedStoragePersistServiceImpl**
@Override
public void updateConfigInfoAtomic(final ConfigInfo configInfo, final String srcIp, final String srcUser,
final Timestamp time, Map<String, Object> configAdvanceInfo) {
// ... 忽略参数处理
final String sql = "UPDATE config_info SET content=?, md5 = ?, src_ip=?,src_user=?,gmt_modified=?,app_name=?,"
+ "c_desc=?,c_use=?,effect=?,type=?,c_schema=? WHERE data_id=? AND group_id=? AND tenant_id=?";
final Object[] args = new Object[] {configInfo.getContent(), md5Tmp, srcIp, srcUser, time, appNameTmp, desc,
use, effect, type, schema, configInfo.getDataId(), configInfo.getGroup(), tenantTmp};
EmbeddedStorageContextUtils.addSqlContext(sql, args);
}
// EmbeddedStorageContextUtils
private static final ThreadLocal<ArrayList<ModifyRequest>> SQL_CONTEXT = ThreadLocal.withInitial(ArrayList::new);
public static void addSqlContext(String sql, Object... args) {
ArrayList<ModifyRequest> requests = SQL_CONTEXT.get();
ModifyRequest context = new ModifyRequest();
context.setExecuteNo(requests.size());
context.setSql(sql);
context.setArgs(args);
requests.add(context);
SQL_CONTEXT.set(requests);
}
当上述所有sql保存到ThreadLocal中之后,EmbeddedStorageContextUtils.onModifyConfigInfo(configInfo, srcIp, time)将配置信息,封装为ConfigDumpEvent,作为扩展信息,放入了另外一个ThreadLocal中。
// EmbeddedStorageContextUtils
private static final ThreadLocal<Map<String, String>> EXTEND_INFO_CONTEXT = ThreadLocal.withInitial(HashMap::new);
public static void onModifyConfigInfo(ConfigInfo configInfo, String srcIp, Timestamp time) {
if (!EnvUtil.getStandaloneMode()) {
ConfigDumpEvent event = ConfigDumpEvent.builder().remove(false).namespaceId(configInfo.getTenant())
.dataId(configInfo.getDataId()).group(configInfo.getGroup()).isBeta(false)
.content(configInfo.getContent()).type(configInfo.getType()).handleIp(srcIp)
.lastModifiedTs(time.getTime()).build();
Map<String, String> extendInfo = new HashMap<>(2);
extendInfo.put(Constants.EXTEND_INFO_CONFIG_DUMP_EVENT, JacksonUtils.toJson(event));
EmbeddedStorageContextUtils.putAllExtendInfo(extendInfo);
}
}
public static void putAllExtendInfo(Map<String, String> map) {
Map<String, String> old = EXTEND_INFO_CONTEXT.get();
old.putAll(map);
EXTEND_INFO_CONTEXT.set(old);
}
至此,所有必须数据准备完毕,通过DatabaseOperate.blockUpdate执行更新。这里DatabaseOperate的实现类是DistributedDatabaseOperateImpl。将ThreadLocal中的ModifyRequest(sql和args)和ConfigDumpEvent都封装到WriteRequest中。
// DistributedDatabaseOperateImpl
@Override
public Boolean update(List<ModifyRequest> sqlContext, BiConsumer<Boolean, Throwable> consumer) {
try {
// {timestamp}-{group}-{ip:port}-{signature}
final String key =
System.currentTimeMillis() + "-" + group() + "-" + memberManager.getSelf().getAddress() + "-"
+ MD5Utils.md5Hex(sqlContext.toString(), Constants.ENCODE);
WriteRequest request = WriteRequest.newBuilder().setGroup(group()).setKey(key)
// List<ModifyRequest>
.setData(ByteString.copyFrom(serializer.serialize(sqlContext)))
// 这里可能有ConfigDumpEvent
.putAllExtendInfo(EmbeddedStorageContextUtils.getCurrentExtendInfo())
.setType(sqlContext.getClass().getCanonicalName()).build();
// consumer is null 代表同步请求
if (Objects.isNull(consumer)) {
// JRaftProtocol.write
Response response = this.protocol.write(request);
if (response.getSuccess()) {
return true;
}
return false;
} else {
// ... 异步请求忽略
}
return true;
} catch (TimeoutException e) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR, e.toString());
} catch (Throwable e) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR, e.toString());
}
}
protocal.write的实现类是JRaftProtocol。一般同步请求,都是适配异步请求方法,这里超时时间是10s。
// JRaftProtocol
@Override
public Response write(WriteRequest request) throws Exception {
CompletableFuture<Response> future = writeAsync(request);
return future.get(10_000L, TimeUnit.MILLISECONDS);
}
@Override
public CompletableFuture<Response> writeAsync(WriteRequest request) {
return raftServer.commit(request.getGroup(), request, new CompletableFuture<>());
}
JRaftServer的commit方法,区分当前节点是leader还是follower。
// JRaftServer
public CompletableFuture<Response> commit(final String group, final Message data,
final CompletableFuture<Response> future) {
final RaftGroupTuple tuple = findTupleByGroup(group);
FailoverClosureImpl closure = new FailoverClosureImpl(future);
// 如果当前节点是leader,优先尝试提交task到
final Node node = tuple.node;
if (node.isLeader()) {
// 如果当前节点是leader,直接执行
applyOperation(node, data, closure);
} else {
// 如果是follower,重定向到leader
invokeToLeader(group, data, rpcRequestTimeoutMs, closure);
}
return future;
}
如果是follower节点,这里会重定向到leader节点,由leader节点执行写入操作。
// JRaftServer
private void invokeToLeader(final String group, final Message request, final int timeoutMillis,
FailoverClosure closure) {
try {
final Endpoint leaderIp = Optional.ofNullable(getLeader(group))
.orElseThrow(() -> new NoLeaderException(group)).getEndpoint();
cliClientService.getRpcClient().invokeAsync(leaderIp, request, new InvokeCallback() {
@Override
public void complete(Object o, Throwable ex) {
if (Objects.nonNull(ex)) {
closure.setThrowable(ex);
closure.run(new Status(RaftError.UNKNOWN, ex.getMessage()));
return;
}
closure.setResponse((Response) o);
closure.run(Status.OK());
}
@Override
public Executor executor() {
return RaftExecutor.getRaftCliServiceExecutor();
}
}, timeoutMillis);
} catch (Exception e) {
closure.setThrowable(e);
closure.run(new Status(RaftError.UNKNOWN, e.toString()));
}
}
如果当前是leader节点,那么不用经过rpc调用,可以直接执行写入操作。(follower重定向过来也是由同样的方法处理)可以看到,这里使用sofa-jraft的Node.apply(Task)方法提交本次写入请求。
// JRaftServer
public void applyOperation(Node node, Message data, FailoverClosure closure) {
final Task task = new Task();
task.setDone(new NacosClosure(data, status -> {
NacosClosure.NacosStatus nacosStatus = (NacosClosure.NacosStatus) status;
closure.setThrowable(nacosStatus.getThrowable());
closure.setResponse(nacosStatus.getResponse());
closure.run(nacosStatus);
}));
task.setData(ByteBuffer.wrap(data.toByteArray()));
node.apply(task);
}
3、应用Log
将Task提交到sofa-jraft框架后,框架会处理所有流程(日志复制、超半数提交),最终会调用用户实现的状态机的onApply方法,此时收到的log已经被过半节点提交,可以应用到所有节点的本地状态机中。
NacosStateMachine是Nacos对于sofa-jraft StateMachine的实现类,定位到onApply方法。
protected final RequestProcessor processor;
// commit log之后触发,应用log到当前节点
@Override
public void onApply(Iterator iter) {
int index = 0;
int applied = 0;
Message message;
NacosClosure closure = null;
try {
while (iter.hasNext()) {
Status status = Status.OK();
try {
// 如果是leader节点,这里done不为空,减少反序列化报文的开销
if (iter.done() != null) {
closure = (NacosClosure) iter.done();
message = closure.getMessage();
} else {
final ByteBuffer data = iter.getData();
message = ProtoMessageUtil.parse(data.array());
}
// 写请求
if (message instanceof WriteRequest) {
Response response = processor.onApply((WriteRequest) message);
postProcessor(response, closure);
}
// 一致性读降级走raft流程
if (message instanceof ReadRequest) {
Response response = processor.onRequest((ReadRequest) message);
postProcessor(response, closure);
}
} catch (Throwable e) {
index++;
status.setError(RaftError.UNKNOWN, e.toString());
Optional.ofNullable(closure).ifPresent(closure1 -> closure1.setThrowable(e));
throw e;
} finally {
Optional.ofNullable(closure).ifPresent(closure1 -> closure1.run(status));
}
applied++;
index++;
iter.next();
}
} catch (Throwable t) {
iter.setErrorAndRollback(index - applied,
new Status(RaftError.ESTATEMACHINE, "StateMachine meet critical error: %s.",
ExceptionUtil.getStackTrace(t)));
}
}
最终WriteRequest被提交到一个RequestProcessor处理,这个实现类仍然是DistributedDatabaseOperateImpl。这里将所有sql放到derby数据库执行,并从WriteRequest的扩展信息中获取ConfigDumpEvent并发布。
public class DistributedDatabaseOperateImpl extends RequestProcessor4CP implements BaseDatabaseOperate {
// raft集群master节点已经commit,这里apply log,将数据落库,提交DumpEvent
@Override
public Response onApply(WriteRequest log) {
final ByteString byteString = log.getData();
List<ModifyRequest> sqlContext = serializer.deserialize(byteString.toByteArray(), List.class);
final Lock lock = readLock;
lock.lock();
try {
boolean isOk = false;
if (log.containsExtendInfo(DATA_IMPORT_KEY)) {
isOk = doDataImport(jdbcTemplate, sqlContext);
} else {
sqlContext.sort(Comparator.comparingInt(ModifyRequest::getExecuteNo));
// 1. 落库derby
isOk = update(transactionTemplate, jdbcTemplate, sqlContext);
// 2. DumpEvent
ConfigExecutor.executeEmbeddedDump(() -> handleExtendInfo(log.getExtendInfoMap()));
}
return Response.newBuilder().setSuccess(isOk).build();
} catch (BadSqlGrammarException | DataIntegrityViolationException e) {
return Response.newBuilder().setSuccess(false).setErrMsg(e.toString()).build();
} catch (DataAccessException e) {
throw new ConsistencyException(e.toString());
} catch (Throwable t) {
throw t;
} finally {
lock.unlock();
}
}
// 从扩展信息中,反序列化ConfigDumpEvent,发布事件
private void handleExtendInfo(Map<String, String> extendInfo) {
if (extendInfo.containsKey(Constants.EXTEND_INFO_CONFIG_DUMP_EVENT)) {
String jsonVal = extendInfo.get(Constants.EXTEND_INFO_CONFIG_DUMP_EVENT);
if (StringUtils.isNotBlank(jsonVal)) {
NotifyCenter.publishEvent(JacksonUtils.toObj(jsonVal, ConfigDumpEvent.class));
}
return;
}
// ...
}
}
ConfigDumpEvent事件的处理,和之前使用MySQL数据源是一样的,会将Derby数据库里的配置dump到本地文件系统中,然后同步到内存配置的CacheItem中,最后响应客户端长轮询请求。下图是第二章的图,从/configs接口到ConfigDumpEvent之间的逻辑与非derby集群有所不同。
二、线性一致读
集群Derby启动,不会直接读Derby数据库,而是会读本地文件系统。比如下面是GET /configs查询配置接口的逻辑。
// ConfigServletInner
public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group,
String tenant, String tag, String clientIp) throws IOException, ServletException {
// ...
// 如果单机部署且使用derby数据源,查询实时配置
if (PropertyUtil.isDirectRead()) {
configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
} else {
// 如果集群部署 或 使用mysql,读取本地文件系统中的配置
file = DiskUtil.targetFile(dataId, group, tenant);
}
// ...
}
所以这里的线性一致读主要存在于Dump阶段,将数据库中的数据Dump到本地文件系统。比如配置中心启动之后,需要将Derby数据库里的数据Dump到本地文件系统,供之后查询使用。
PersistService实现类EmbeddedStoragePersistServiceImpl,queryConfigInfo查询配置方法如下。
@Override
public ConfigInfoWrapper queryConfigInfo(final String dataId, final String group, final String tenant) {
String tenantTmp = StringUtils.isBlank(tenant) ? StringUtils.EMPTY : tenant;
final String sql = "SELECT ID,data_id,group_id,tenant_id,app_name,content,type,gmt_modified,md5 FROM config_info WHERE data_id=? AND group_id=? AND tenant_id=?";
return databaseOperate.queryOne(sql, new Object[] {dataId, group, tenantTmp}, CONFIG_INFO_WRAPPER_ROW_MAPPER);
}
由于是集群启动,进入DistributedDatabaseOperateImpl#queryOne方法。
@Override
public <R> R queryOne(String sql, Object[] args, RowMapper<R> mapper) {
try {
byte[] data = serializer.serialize(
SelectRequest.builder().queryType(QueryType.QUERY_ONE_WITH_MAPPER_WITH_ARGS).sql(sql).args(args)
.className(mapper.getClass().getCanonicalName()).build());
final boolean blockRead = EmbeddedStorageContextUtils
.containsExtendInfo(Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);
Response response = innerRead(
ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(data)).build(), blockRead);
if (response.getSuccess()) {
return serializer.deserialize(response.getData().toByteArray(),
ClassUtils.resolveGenericTypeByInterface(mapper.getClass()));
}
throw new NJdbcException(response.getErrMsg(), response.getErrMsg());
} catch (Exception e) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR, e.toString());
}
}
private Response innerRead(ReadRequest request, boolean blockRead) throws Exception {
if (blockRead) {
return (Response) protocol.aGetData(request).join();
}
return protocol.getData(request);
}
最后还是进入JRaftProtocol处理ReadRequest。
// JRaftProtocol
@Override
public CompletableFuture<Response> aGetData(ReadRequest request) {
return raftServer.get(request);
}
JRaftServer的get方法,处理ReadRequest。这里调用sofa-jraft的Node.readIndex方法处理一致性读,如果处理失败,会降级走一遍raft流程(过半提交,应用log,和写流程一样)。
// JRaftServer
CompletableFuture<Response> get(final ReadRequest request) {
final String group = request.getGroup();
CompletableFuture<Response> future = new CompletableFuture<>();
final RaftGroupTuple tuple = findTupleByGroup(group);
final Node node = tuple.node;
final RequestProcessor processor = tuple.processor;
// 1. 优先采用ReadIndex方式进行一致性读
try {
node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
// 2. 经过JRaft处理完成后,此处收到回调,响应客户端
if (status.isOk()) {
try {
Response response = processor.onRequest(request);
future.complete(response);
} catch (Throwable t) {
future.completeExceptionally(new ConsistencyException());
}
return;
}
// ...
}
});
return future;
} catch (Throwable e) {
// 3. 各种RPC调用发生异常,走raft流程,忽略
readFromLeader(request, future);
return future;
}
}
当sofa-jraft框架处理后,发现readIndex<=applyIndex时,ReadIndexClosure收到回调,此时可以读取本地状态机中的数据,即当前derby数据源中的数据。
// DistributedDatabaseOperateImpl
public Response onRequest(final ReadRequest request) {
final SelectRequest selectRequest = serializer
.deserialize(request.getData().toByteArray(), SelectRequest.class);
final RowMapper<Object> mapper = RowMapperManager.getRowMapper(selectRequest.getClassName());
final byte type = selectRequest.getQueryType();
readLock.lock();
Object data;
try {
switch (type) {
case QueryType.QUERY_ONE_WITH_MAPPER_WITH_ARGS:
data = queryOne(jdbcTemplate, selectRequest.getSql(), selectRequest.getArgs(), mapper);
break;
case QueryType.QUERY_ONE_NO_MAPPER_NO_ARGS:
data = queryOne(jdbcTemplate, selectRequest.getSql(),
ClassUtils.findClassByName(selectRequest.getClassName()));
break;
// ...
default:
throw new IllegalArgumentException("Unsupported data query categories");
}
ByteString bytes = data == null ? ByteString.EMPTY : ByteString.copyFrom(serializer.serialize(data));
return Response.newBuilder().setSuccess(true).setData(bytes).build();
} catch (Exception e) {
return Response.newBuilder().setSuccess(false)
.setErrMsg(...).build();
} finally {
readLock.unlock();
}
}
总结
背景:Nacos配置中心集群启动,使用Derby嵌入数据源。
写一致
将sql和args封装为一个ModifyRequest放入ThreadLocal
将ConfigDumpEvent放入ThreadLocal
使用ThreadLocal中的参数,构建WriteRequest,调用sofa-jraft的Node.apply(Task)方法提交写请求到Raft集群
当超半数节点commit log成功,所有节点(无论是leader发现超半数commit,还是follower通过日志复制)的NacosStateMachine的onApply方法都会被调用,此时将会执行ModifyRequest中的sql,并发布request中的ConfigDumpEvent。
线性一致读
场景:将derby数据库中的数据dump到本地文件系统
JRaftServer会调用sofa-jraft的Node.readIndex方法,执行一致性读;如果一致性读失败,降级走raft流程,与写流程一样,实现线性一致性。
sofa-jraft框架发现readIndex<=applyIndex时,ReadIndexClosure收到回调,此时可以读取本地状态机中的数据,即当前derby数据源中的数据。