Redis集群-6集群故障转移
Redis集群(最新版5.0.0)
6.故障转移
7.集群运维(待续)
8.数据和槽位的分配算法
故障发现
故障转移首先要发现故障,发现故障的主要环节分两步:主观下线和客观下线。
主观下线
主观下线是redis在发送ping,接受pong超时,在cluster-node-timeout时间内通信一直失败,发送ping的节点会记录目标节点为主观下线。例如:a节点发送ping给节点b,如果通信正常,则节点a更新与b节点的通信时间,如果没接收到b节点回复的pong则不更新时间,节点a定时任务会检测,通信时间超过cluster-node-timeout则对节点b状态改为主观下线。
typedef struct clusterNode {
int flags; /* CLUSTER_NODE_... */
mstime_t ping_sent; /* 该节点最后发送ping的unix时间Unix time we sent latest ping */
mstime_t pong_received; /* 该节点最后收到pong的unix时间Unix time we received the pong */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t voted_time; /* 该节点最后发送投票的unix时间Last time we voted for a slave of this master */
......
} clusterNode;
flags标识节点对应状态,值为如下:
/* Cluster node flags and macros. */
#define CLUSTER_NODE_MASTER 1 /* 主节点 */
#define CLUSTER_NODE_SLAVE 2 /* 从节点 */
#define CLUSTER_NODE_PFAIL 4 /* 主观下线 */
#define CLUSTER_NODE_FAIL 8 /* 客观下线 */
#define CLUSTER_NODE_MYSELF 16 /* 自身节点 */
#define CLUSTER_NODE_HANDSHAKE 32 /* 握手状态,未与其他节点进行通信 */
#define CLUSTER_NODE_NOADDR 64 /* 无地址节点,第一次通信未完成或失败时使用 */
#define CLUSTER_NODE_MEET 128 /* 需要接受meet的节点状态 */
#define CLUSTER_NODE_MIGRATE_TO 256 /* 该节点被选中未新的主节点 */
#define CLUSTER_NODE_NOFAILOVER 512 /*从节点不进行故障转移 */
// 空名字(在节点为主节点时,用作消息中的 slaveof 属性的值)
#define CLUSTER_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
判断主观下线的代码在clusterCron的定时任务里
void clusterCron(void) {
......
delay=now - node->ping_sent;
if (delay > server.cluster_node_timeout) {
/* Timeout reached. Set the node as possibly failing if it is
* not already in this state. */
if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
node->name);
node->flags |= CLUSTER_NODE_PFAIL;
update_state = 1;
}
}
}
如果节点最后一次与该节点ping通信时间差大于cluster_node_timeout,则判断为主观下线,但是只有一个节点认为下线并不一定是需要下线,有可能别的节点通信没问题,所以判断是否需要下线,需要多节点协作完成故障发现的过程为客观下线。
客观下线
void markNodeAsFailingIfNeeded(clusterNode *node) {
int failures;//下线节点有效的下线数量
int needed_quorum = (server.cluster->size / 2) + 1;//主观下线节点必须超过槽节点数量的一半
if (!nodeTimedOut(node)) return; /* We can reach it. */
if (nodeFailed(node)) return; /* Already FAILing. */
failures = clusterNodeFailureReportsCount(node);//获取节点有效的下线报告数量
/* Also count myself as a voter if I'm a master. */
if (nodeIsMaster(myself)) failures++;//自身如果是master也加下线报告数里
//如果下线报告书小于槽节点数量的一半则返回
if (failures < needed_quorum) return; /* No weak agreement from masters. */
serverLog(LL_NOTICE,
"Marking node %.40s as failing (quorum reached).", node->name);
/* 标记客观下线并更新时间Mark the node as failing. */
node->flags &= ~CLUSTER_NODE_PFAIL;
node->flags |= CLUSTER_NODE_FAIL;
node->fail_time = mstime();
if (nodeIsMaster(myself)) clusterSendFail(node->name);//当前节点是主节点则在集群中广播
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}
上面是客观下线的代码,流程说明:
1)统计failures数量和needed_quorum,needed_quorum为持有槽的主节点数量的一半,如果当前节点是master需要把当前节点也算进去对failures进行加1。
2)当failures小于needed_quorum,则直接返回。
3)当failures大于needed_quorum时,更新标记客观下线并更新时间。
4)集群广播客观下线消息,通知所有节点标记为客观下线,立即生效,通知故障的从节点触发故障转移。
注意:当网络出现分区时,分为一大一小两个独立集群,小的不会到半数,所以不会发生故障转移。恢复后故障节点变为客观下线。假设故障节点所有的从节点都在小集群,则也无法完成故障转移,所有网络拓扑结构需要考虑这点,减少主从分区的概率。
故障恢复
当故障节点变为客观下线,就需要在该节点的所有从节点中选出一个leader,代替故障节点。故障恢复流程从资格检查、准备选举时间、发起选举、选举投票、替换主节点等5个模块进行介绍。
资格检查
资格检查实际就是校验过程,校验的规则有很多,其中主要的规则是校验每个从节点,检查从节点最后与主节点断线时间,根据repl_ping_slave_period和cluster_slave_validity_factor校验数据的新鲜度,如果是老数据则直接return,不具备故障转移资格。这两个参数默认都是10。主要代码如下:
if (server.cluster_slave_validity_factor &&
data_age >
(((mstime_t)server.repl_ping_slave_period * 1000) +
(server.cluster_node_timeout * server.cluster_slave_validity_factor)))
{
if (!manual_failover) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
return;
}
}
准备选举时间
typedef struct clusterState {
mstime_t failover_auth_time; /* 上次执行或下次选举时间 */
int failover_auth_count; /* 获得的投票数量*/
int failover_auth_sent; /* 如果是1表示已经向其他节点发送了投票请求*/
int failover_auth_rank; /* 记录当前从节点排名 */
uint64_t failover_auth_epoch; /* 当前选举纪元*/
int cant_failover_reason; /* 不能故障转移的原因*/
}
当资格检查通过后,更新触发选举时间failover_auth_time。failover_auth_rank记录当前从节点排名,为了保障选举出来的节点延迟更低,复制偏移量越大的节点优先级越高,采用延迟触发机制,对不同从节点使用不同的延迟时间进行选举。
void clusterHandleSlaveFailover(void) {
......
/* If the previous failover attempt timedout and the retry time has
* elapsed, we can setup a new one. */
if (auth_age > auth_retry_time) {
server.cluster->failover_auth_time = mstime() +
500 + /* 500ms */
random() % 500; /* 随机0~500ms*/
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_sent = 0;
server.cluster->failover_auth_rank = clusterGetSlaveRank();
server.cluster->failover_auth_time +=
server.cluster->failover_auth_rank * 1000;
/* 手动故障转移不需要延迟处理 */
if (server.cluster->mf_end) {
server.cluster->failover_auth_time = mstime();
server.cluster->failover_auth_rank = 0;
}
/* 像下线主节点的所有从节点发送pong包,当前节点的复制偏移量用于其他节点更新自己排名*/
clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
return;
}
......
clusterHandleSlaveFailover函数中,上面这部分是选举时间的过程,在定时器中调用, server.cluster->failover_auth_time记录延迟选举开始时间,mstime当前时间+500ms(固定500ms是让主节点下线的消息在集群中传播)+随机数0~500(为了避免从节点同时发起选举)+failover_auth_rank*1000 ,rank的排名是通过clusterGetSlaveRank函数得到,排名越大发起选举时间优先级越高,rank为0时最大。看下clusterGetSlaveRank函数代码如下:
int clusterGetSlaveRank(void) {
long long myoffset;
int j, rank = 0;
clusterNode *master;
serverAssert(nodeIsSlave(myself));
master = myself->slaveof;
if (master == NULL) return 0; /* Never called by slaves without master. */
myoffset = replicationGetSlaveOffset();//获取当前节点复制偏移量
//循环主节点下所有从节点根据节点复制偏移量进行排名
for (j = 0; j < master->numslaves; j++)
if (master->slaves[j] != myself &&
!nodeCantFailover(master->slaves[j]) &&
master->slaves[j]->repl_offset > myoffset) rank++;
return rank;
}
replicationGetSlaveOffset函数获取了当前节点的复制偏移量,然后循环当前节点的需下线的主节点下所有从节点,对复制偏移量进行排名。
其实从这就可以看出复制偏移量越大,发起选举的时间就越早,节点选举成为主节点的可能性就越大。
发起选举
达到时间failover_auth_time时会发起选举,代码如下:
if (server.cluster->failover_auth_sent == 0) {
server.cluster->currentEpoch++;
server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
(unsigned long long) server.cluster->currentEpoch);
clusterRequestFailoverAuth();
server.cluster->failover_auth_sent = 1;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
return; /* Wait for replies. */
}
server.cluster->currentEpoch是全局的只增不减的配置纪元,记录集群内所有主节点的最大版本。对currentEpoch进行加1,然后调用函数clusterRequestFailoverAuth发起投票,这个函数内部会向集群广播消息CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST。
选举投票
在clusterProcessPacket包中对收到消息进行判断如果是CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST消息则调用clusterSendFailoverAuthIfNeeded函数进行投票,这个函数中先进行跟中校验,必须是负责一定槽位的主节点才能投票。
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
clusterNode *master = node->slaveof;
uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
unsigned char *claimed_slots = request->myslots;
int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
int j;
/* 当前节点是从节点或没有负责的槽位直接返回*/
if (nodeIsSlave(myself) || myself->numslots == 0) return;
/* 如果发送的纪元小于当前节点全局纪元则不一致,返回 */
if (requestCurrentEpoch < server.cluster->currentEpoch) {
return;
}
/* 最后投票纪元等于全局纪元,则已经投过票 */
if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
return;
}
/* 必须是从节点并且主节点是下线节点,或是下线主节点未下线,是手动故障转移 */
if (nodeIsMaster(node) || master == NULL ||
(!nodeFailed(master) && !force_ack))
{
if (nodeIsMaster(node)) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: it is a master node",
node->name);
} else if (master == NULL) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: I don't know its master",
node->name);
} else if (!nodeFailed(master)) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: its master is up",
node->name);
}
return;
}
/*验证投票有效期*/
if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
{
serverLog(LL_WARNING,
"Failover auth denied to %.40s: "
"can't vote about this master before %lld milliseconds",
node->name,
(long long) ((server.cluster_node_timeout*2)-
(mstime() - node->slaveof->voted_time)));
return;
}
/* 如果发送节点的配置纪元小于从节点的配置纪元,说明不是最新的,返回*/
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(claimed_slots, j) == 0) continue;
if (server.cluster->slots[j] == NULL ||
server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
{
continue;
}
serverLog(LL_WARNING,
"Failover auth denied to %.40s: "
"slot %d epoch (%llu) > reqEpoch (%llu)",
node->name, j,
(unsigned long long) server.cluster->slots[j]->configEpoch,
(unsigned long long) requestConfigEpoch);
return;
}
/* We can vote for this slave. */
server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
node->slaveof->voted_time = mstime();
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
clusterSendFailoverAuth(node);
serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
node->name, (unsigned long long) server.cluster->currentEpoch);
}
这段函数过程在代码中已经注释,lastVoteEpoch ==currentEpoch的判断中如果两个从节点同时发起投票,只会投票给一个节点,如果两个从节点属于不同的主节点也只会进行一次投票。
configEpoch纪元是每个主节点记录自身的一个版本,从节点是复制的主节点的,而currentEpoch是整个集群的一个全局版本。如果发送节点的配置纪元configEpoch小于请求的configEpoch,则说明发送请求的节点不是最新的,可能是一个长时间下线重新上线的节点,不进行投票。
最后进行投票,lastVoteEpoch 置为currentEpoch,更新投票时间。调用clusterSendFailoverAuth函数,在函数内部对发送选举节点发送消息CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK进行投票。
投票过程是一个选举过程,集群有N个持有槽的主节点则每个主节点只能投给一个从节点,所以只会有一个节点得到N/2+1的选票,才能保证有唯一从节点选举胜出。
Redis没有采用从节点进行选举,主要是因为如果使用从节点进行投票则从节点必须大于等于3个才能得到N/2+1个节点,会导致资源浪费。
替换主节点
从节点收到消息,判断是CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK后先校验下当前选举的配置纪元,通过后则进行计票,对failover_auth_count进行加1。
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
if (!sender) return 1; /* We don't know that node. */
/* 校验下配置纪元,通过则对投票计数 */
if (nodeIsMaster(sender) && sender->numslots > 0 &&
senderCurrentEpoch >= server.cluster->failover_auth_epoch)
{
server.cluster->failover_auth_count++;
/* Maybe we reached a quorum here, set a flag to make sure
* we check ASAP. */
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
}
int needed_quorum = (server.cluster->size / 2) + 1;
......
/* Check if we reached the quorum. */
if (server.cluster->failover_auth_count >= needed_quorum) {
serverLog(LL_WARNING,
"Failover election won: I'm the new master.");
/* 更新自身节点的纪元为选举的纪元 */
if (myself->configEpoch < server.cluster->failover_auth_epoch) {
myself->configEpoch = server.cluster->failover_auth_epoch;
serverLog(LL_WARNING,
"configEpoch set to %llu after successful failover",
(unsigned long long) myself->configEpoch);
}
/* Take responsibility for the cluster slots. */
clusterFailoverReplaceYourMaster();
} else {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
}
当failover_auth_count大于等于集群主节点N/2+1的时,从节点翻身做主人,选举胜出,调用clusterFailoverReplaceYourMaster进行主节点替换。
void clusterFailoverReplaceYourMaster(void) {
int j;
clusterNode *oldmaster = myself->slaveof;
if (nodeIsMaster(myself) || oldmaster == NULL) return;
/* 1) 成为主节点 */
clusterSetNodeAsMaster(myself);
replicationUnsetMaster();//取消主从复制,将当前节点升级为主节点
/* 2) Claim all the slots assigned to our master. */
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (clusterNodeGetSlotBit(oldmaster,j)) {
clusterDelSlot(j);//撤销故障节点负责的槽
clusterAddSlot(myself,j);//让自身负责这些槽位
}
}
/* 3) 更新状态和保存配置. */
clusterUpdateState();
clusterSaveConfigOrDie(1);
/* 4) 向集群广播. */
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
/* 5) 手动故障转移重置状态 */
resetManualFailover();
}