Jetcd源码以及raft算法的分析(下)
接着上篇我们来分析下jetcd的其他模块和raft算法
如图:
我们一个个看
首先jetcd-common :
都是公共类
jetcd-examples:
可以作为学习用,里边的代码都是main方法启动的,可以直接跑(看源码的以后也可以这么看—)
jetcd-launcher:
里边提供了一个已编程插件的方式启动etcd 服务器,核心代码如下:
import org.apache.maven.plugins.annotations.Mojo;
import org.apache.maven.plugins.annotations.Parameter;
"start", requiresProject = false, defaultPhase = LifecyclePhase.PRE_INTEGRATION_TEST) (name =
public class StartMojo extends AbstractMojo {
private static final Logger LOG = LoggerFactory.getLogger(StartMojo.class);
private static final String[] EMPTY = new String[0];
true, defaultValue = "target/jetcd-launcher-maven-plugin/endpoint") (required =
private File endpointFile;
/**
* <a href="https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/configuration.md">Additional
* arguments</a> to pass to etcd.
*/
false) (required =
private String[] additionalArguments;
public StartMojo() {
}
public void execute() throws MojoExecutionException, MojoFailureException {
Singleton.etcd = EtcdClusterFactory.buildCluster("maven", 1, false,
additionalArguments != null ? additionalArguments : EMPTY);
Singleton.etcd.start();
URI endpoint = Singleton.etcd.getClientEndpoints().get(0);
try {
endpointFile.getParentFile().mkdirs();
Files.asCharSink(endpointFile, US_ASCII).write(endpoint.toString());
LOG.info("{} = {}", endpointFile, endpoint);
} catch (IOException e) {
throw new MojoFailureException("writing file failed", e);
}
}
}
通过apache maven 插件的api 编写指定插件,我们可以看到具体使用来自xxx-test子项目:
<plugin>
<groupId>${project.groupId}</groupId>
<artifactId>jetcd-launcher-maven-plugin</artifactId>
<version>${project.version}</version>
<configuration>
<additionalArguments>
<additionalArgument>--max-txn-ops</additionalArgument>
<additionalArgument>1024</additionalArgument>
</additionalArguments>
</configuration>
<executions>
<execution>
<id>start-etcd</id>
<phase>process-test-classes</phase>
<goals>
<goal>start</goal>
</goals>
</execution>
<execution>
<id>stop-etcd</id>
<phase>prepare-package</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>
jetcd-resolver:
题外话:
单元测试有用到
await().atMost(15, TimeUnit.SECONDS).untilAsserted(() -> assertThat(ref.get()).isTrue());
用的是org.awaitility的包,此包主要用来做异步场景下的测试
此外在jetcd-launcher项目中我们可以看到源码如下:
public static EtcdCluster buildCluster(
@NonNull String clusterName,
int nodes,
boolean ssl,
@NonNull String image,
List<String> additionalArgs) {
final Network network = Network.builder().id(clusterName).build();
final CountDownLatch latch = new CountDownLatch(nodes);
final AtomicBoolean failedToStart = new AtomicBoolean(false);
final EtcdContainer.LifecycleListener listener = new EtcdContainer.LifecycleListener() {
public void started(EtcdContainer container) {
latch.countDown();
}
public void failedToStart(EtcdContainer container, Exception exception) {
LOGGER.error("Exception while starting etcd container: ", exception);
failedToStart.set(true);
latch.countDown();
}
public void stopped(EtcdContainer container) {
}
};
之前有说过该项目主要是用来提供继承测试适应云场景,所以可以发现以下的虚拟容器提供etcd继承测试服务端,当然你也可以实现其他的服务端虚拟容器用到自己的项目中
import org.testcontainers.containers.Network;
raft:
log replication:
这里笔者做一个简单介绍,当你使用的是一个单体应用的时候,你是无需进行数据同步的如图:
这个时候客户端发送请求服务端处理存储即可,那么如果服务端集群模式,那么服务端如何同步,类似如下:
假设中间的节点是leader,数据发送到leader节点,leader节点需要在第一次性跳时间后发送数据同步请求其他的节点
实际上这里的同步涉及到多数法人的原则,也leader是不会直接提交到本地数据的,而是在第一次心跳间隔之后发送数据同步请求,接收到请求的节点保存数据,返回给leader,当leader接收到了超过半数的成功返回则提交本地事务。否则不提交(当脑裂出现的时候,基数节点的多个leader因为没有多数其他节点的commit 该脑裂节点也不会提交数据,这也就是raft处理脑裂的核心如下图)
lead election:
首先了解一下几个基本的角色:
leader
follower
candidate
首先所有的节点都是follower的角色,通过选举超时时间之后随机成为候选人
candidate,然后第一个成为候选人的节点发起请求(投票给自己)给其他节点,如果多数节点返回投票响应,当前follower成为candidate。这里有一个规则,当集群中自己没有成为candidate之前,所有来自其他节点的投票请求都会成功返回,换句话说就是当自己没有发起投票请求之前都会主动投票给其他节点
整个流程还是比较复杂的,涉及到多leader分片(多个follower同时成为candidate发起投票的情况,raft如何处理),以及如何通过修改election timout 动态控制选举流程等等,因为篇幅限制这里不赘述,日后有时间会给一篇专门分析,实现可以去看nacos的raftCore的流程,这里贴一段初始化代码:
public void init() throws Exception {
Loggers.RAFT.info("initializing Raft sub-system");
executor.submit(notifier);
final long start = System.currentTimeMillis();
raftStore.loadDatums(notifier, datums);
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
while (true) {
if (notifier.tasks.size() <= 0) {
break;
}
Thread.sleep(1000L);
}
initialized = true;
Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
GlobalExecutor.registerMasterElection(new MasterElection());
GlobalExecutor.registerHeartbeat(new HeartBeat());
Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}
核心流程在registerMasterElection里边。
敬请期待后续.....