redis 源码解析:主从同步
这篇文章介绍主从同步的源码实现。
阅读本篇文章需要对 redis 事件处理有所了解,各位读者可自行查找相关资料。
本篇文章中,源码采用 redis 5.0.8 版本。
概述
主从同步包括:
增量同步
部分同步
全量同步
状态
节点状态是很重要的一个变量,在整个同步过程中,会涉及到各种各样的状态迁移,这里贴出节点的全部状态,以备后面对照之用。
/* Slave replication state. Used in server.repl_state for slaves to remember
* what to do next. */
/* --- Handshake states, must be ordered --- */
/* --- End of handshake states --- */
REPL_STATE_NONE
服务器初始化时的默认状态。
void initServerConfig(void)
server.repl_state = REPL_STATE_NONE;
REPL_STATE_CONNECT
此状态表示从节点可以连接到主节点。
达到此状态有两种方式:
1、配置文件中配置 replicaof 指令;
2、执行 replicaof 命令。
void loadServerConfigFromString(char *config)
else if ((!strcasecmp(argv[0],"slaveof") ||
!strcasecmp(argv[0],"replicaof")) && argc == 3) {
slaveof_linenum = linenum;
server.masterhost = sdsnew(argv[1]);
server.masterport = atoi(argv[2]);
server.repl_state = REPL_STATE_CONNECT;
}
void replicaofCommand(client *c)
void replicationSetMaster(char *ip, int port)
server.masterhost = sdsnew(ip);
server.masterport = port;
server.repl_state = REPL_STATE_CONNECT;
接下来,我们开始介绍增量同步。
增量同步是指当从节点与主节点数据同步完成以后,主节点每一次数据变动,都同步到从节点的过程。
这里,我们没有按照先全量再增量的顺序来介绍,作者有自己的考虑。
增量同步
/* Command call flags, see call() function */
/* Command propagation flags, see propagate() function */
redis 命令的执行,最终都要通过调用 call() 函数来实现。
int processCommand(client *c)
call(c,CMD_CALL_FULL);
但这篇文章,我们关注的显然不是命令如何执行,而是命令执行后,如何同步到从节点?
void call(client *c, int flags)
/* Propagate the command into the AOF and replication link */
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
int propagate_flags = PROPAGATE_NONE;
/* Check if the command operated changes in the data set. If so
* set for replication / AOF propagation. */
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
/* If the client forced AOF / replication of the command, set
* the flags regardless of the command effects on the data set. */
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
/* However prevent AOF / replication propagation if the command
* implementations called preventCommandPropagation() or similar,
* or if we don't have the call() flags to do so. */
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;
/* Call propagate() only if at least one of AOF / replication
* propagation is needed. Note that modules commands handle replication
* in an explicit way, so we never replicate them automatically. */
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
/* Propagate the specified command (in the context of the specified database id)
* to AOF and Slaves.
*
* flags are an xor between:
* + PROPAGATE_NONE (no propagation of command at all)
* + PROPAGATE_AOF (propagate into the AOF file if is enabled)
* + PROPAGATE_REPL (propagate into the replication link)
*
* This should not be used inside commands implementation. Use instead
* alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().
*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
char llstr[LONG_STR_SIZE];
/* If the instance is not a top level master, return ASAP: we'll just proxy
* the stream of data we receive from our master instead, in order to
* propagate *identical* replication stream. In this way this slave can
* advertise the same replication ID as the master (since it shares the
* master replication history and has the same backlog and offsets). */
if (server.masterhost != NULL) return;
/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
/* We can't have slaves attached and no backlog. */
serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
/* Send SELECT command to every slave if needed. */
if (server.slaveseldb != dictid) {
robj *selectcmd;
/* For a few DBs we have pre-computed SELECT command. */
if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
selectcmd = shared.select[dictid];
} else {
int dictid_len;
dictid_len = ll2string(llstr,sizeof(llstr),dictid);
selectcmd = createObject(OBJ_STRING,
sdscatprintf(sdsempty(),
"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
dictid_len, llstr));
}
/* Add the SELECT command into the backlog. */
if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
/* Send it to slaves. */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
addReply(slave,selectcmd);
}
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}
server.slaveseldb = dictid;
/* Write the command to the replication backlog if any. */
if (server.repl_backlog) {
char aux[LONG_STR_SIZE+3];
/* Add the multi bulk reply length. */
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
for (j = 0; j < argc; j++) {
long objlen = stringObjectLen(argv[j]);
/* We need to feed the buffer with the object as a bulk reply
* not just as a plain string, so create the $..CRLF payload len
* and add the final CRLF */
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
feedReplicationBacklogWithObject(argv[j]);
feedReplicationBacklog(aux+len+1,2);
}
}
/* Write the command to every slave. */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
/* Don't feed slaves that are still waiting for BGSAVE to start */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
/* Add the multi bulk length. */
addReplyMultiBulkLen(slave,argc);
/* Finally any additional argument that was not stored inside the
* static buffer if any (from j to argc). */
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
}
上面这几段代码,其调用链如下:
processCommand()->call()->propagate()->replicationFeedSlaves()
命令执行后,同步到从节点,定位到以下代码:
/* Write the command to every slave. */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
/* Don't feed slaves that are still waiting for BGSAVE to start */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
/* Add the multi bulk length. */
addReplyMultiBulkLen(slave,argc);
/* Finally any additional argument that was not stored inside the
* static buffer if any (from j to argc). */
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
即命令在主节点执行后,发送给从节点再执行一遍。
到这里,我们有两个疑问:
server.slaves 列表是何时添加的?
SLAVE_STATE_WAIT_BGSAVE_START 状态是何时设置,又是何时取消的?
带着这两个疑问,我们开始研究全量同步的执行流程:
全量同步
一切从头开始。
void initServer(void)
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth. */
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
初始化服务器时,创建一个时间事件,事件处理函数为:serverCron(),每100毫秒执行一次。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)
/* Replication cron function -- used to reconnect to master,
* detect transfer failures, start background RDB transfers and so forth. */
run_with_period(1000) replicationCron();
serverCron() 函数调用 replicationCron() 函数。replicationCron() 函数是主从同步的入口函数,每秒执行一次。
主从同步涉及两个方面:主节点、从节点。下面,我们分别加以讨论:
从节点
上面已经提到,replicationCron() 函数是主从同步的入口函数。那么,在 replicationCron() 函数中,从节点都做了哪些操作呢?
void replicationCron(void)
/* Check if we should connect to a MASTER */
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
}
}
replicationCron() 函数虽然很长,但关键只有上面这几行。我们前面已经提到,当从节点状态为 REPL_STATE_CONNECT 时,从节点连接到主节点。
int connectWithMaster(void) {
int fd;
fd = anetTcpNonBlockBestEffortBindConnect(NULL,
server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);
if (fd == -1) {
serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
strerror(errno));
return C_ERR;
}
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
AE_ERR)
{
close(fd);
serverLog(LL_WARNING,"Can't create readable event for SYNC");
return C_ERR;
}
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_s = fd;
server.repl_state = REPL_STATE_CONNECTING;
return C_OK;
}
connectWithMaster() 函数连接到主节点,创建一个文件事件(可读写),事件处理函数为:syncWithMaster(),从节点状态变为 REPL_STATE_CONNECTING。
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask)
if (server.repl_state == REPL_STATE_SEND_PSYNC) {
if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
err = sdsnew("Write error sending the PSYNC command.");
goto write_error;
}
server.repl_state = REPL_STATE_RECEIVE_PSYNC;
return;
}
psync_result = slaveTryPartialResynchronization(fd,1);
/* Setup the non blocking download of the bulk file. */
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
serverLog(LL_WARNING,
"Can't create readable event for SYNC: %s (fd=%d)",
strerror(errno),fd);
goto error;
}
server.repl_state = REPL_STATE_TRANSFER;
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
syncWithMaster() 函数很长,我们分两部分讨论:
先说第一部分:经过一系列状态迁移(即握手过程),从节点状态从 REPL_STATE_CONNECTING 变为 REPL_STATE_RECEIVE_PSYNC,同时,从节点向主节点发送 PSYNC 命令,读取 PSYNC 响应。
int slaveTryPartialResynchronization(int fd, int read_reply)
aeDeleteFileEvent(server.el,fd,AE_READABLE);
if (!strncmp(reply,"+FULLRESYNC",11)) {
char *replid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the run id
* and the replication offset. */
replid = strchr(reply,' ');
if (replid) {
replid++;
offset = strchr(replid,' ');
if (offset) offset++;
}
if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
serverLog(LL_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* replid to make sure next PSYNCs will fail. */
memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
} else {
memcpy(server.master_replid, replid, offset-replid-1);
server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
server.master_initial_offset = strtoll(offset,NULL,10);
serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
server.master_replid,
server.master_initial_offset);
}
/* We are going to full resync, discard the cached master structure. */
replicationDiscardCachedMaster();
sdsfree(reply);
return PSYNC_FULLRESYNC;
}
读取 PSYNC 响应后,先删除文件事件的读操作,然后初始化 server.master_replid 和 server.master_initial_offset 两个变量。
再说第二部分:删除文件事件的读操作之后,又创建一个新的读文件事件,事件处理函数为:readSyncBulkPayload(),同时,从节点状态变为 REPL_STATE_TRANSFER。
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask)
/* Check if the transfer is now complete */
if (!usemark) {
if (server.repl_transfer_read == server.repl_transfer_size)
eof_reached = 1;
}
if (eof_reached) {
int aof_is_enabled = server.aof_state != AOF_OFF;
/* Ensure background save doesn't overwrite synced data */
if (server.rdb_child_pid != -1) {
serverLog(LL_NOTICE,
"Replica is about to load the RDB file received from the "
"master, but there is a pending RDB child running. "
"Killing process %ld and removing its temp file to avoid "
"any race",
(long) server.rdb_child_pid);
kill(server.rdb_child_pid,SIGUSR1);
rdbRemoveTempFile(server.rdb_child_pid);
}
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> REPLICA synchronization: %s", strerror(errno));
cancelReplicationHandshake();
return;
}
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
/* We need to stop any AOFRW fork before flusing and parsing
* RDB, otherwise we'll create a copy-on-write disaster. */
if(aof_is_enabled) stopAppendOnly();
signalFlushedDb(-1);
emptyDb(
-1,
server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,
replicationEmptyDbCallback);
/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to
* time for non blocking loading. */
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
cancelReplicationHandshake();
/* Re-enable the AOF if we disabled it earlier, in order to restore
* the original configuration. */
if (aof_is_enabled) restartAOFAfterSYNC();
return;
}
/* Final setup of the connected slave <- master link */
zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;
/* After a full resynchroniziation we use the replication ID and
* offset of the master. The secondary ID / offset are cleared since
* we are starting a new history. */
memcpy(server.replid,server.master->replid,sizeof(server.replid));
server.master_repl_offset = server.master->reploff;
clearReplicationId2();
/* Let's create the replication backlog if needed. Slaves need to
* accumulate the backlog regardless of the fact they have sub-slaves
* or not, in order to behave correctly if they are promoted to
* masters after a failover. */
if (server.repl_backlog == NULL) createReplicationBacklog();
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending
* to the new file. */
if (aof_is_enabled) restartAOFAfterSYNC();
}
readSyncBulkPayload() 函数读取主节点发送的内存快照(即 dump.rdb 文件),保存到本地磁盘,保存完成以后,先清空内存,再加载内存快照到内存,完成数据同步,同时删除读文件事件,将从节点状态变为 REPL_STATE_CONNECTED。
/* Once we have a link with the master and the synchroniziation was
* performed, this function materializes the master client we store
* at server.master, starting from the specified file descriptor. */
void replicationCreateMasterClient(int fd, int dbid) {
server.master = createClient(fd);
server.master->flags |= CLIENT_MASTER;
server.master->authenticated = 1;
server.master->reploff = server.master_initial_offset;
server.master->read_reploff = server.master->reploff;
memcpy(server.master->replid, server.master_replid,
sizeof(server.master_replid));
/* If master offset is set to -1, this master is old and is not
* PSYNC capable, so we flag it accordingly. */
if (server.master->reploff == -1)
server.master->flags |= CLIENT_PRE_PSYNC;
if (dbid != -1) selectDb(server.master,dbid);
}
client *createClient(int fd)
/* passing -1 as fd it is possible to create a non connected client.
* This is useful since all the commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
if (fd != -1) {
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
数据同步完成以后,调用 createClient() 函数,createClient() 函数会创建一个读文件事件,事件处理函数为:readQueryFromClient(),此后,从节点就可以正常读取主节点发送的命令了。
主节点
对于主节点来说,replicationCron() 函数并没有做什么关键操作,那么,在主从同步过程中,主节点是如何开始的呢?
void initServer(void)
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
初始化服务器时,创建一个读文件事件,事件处理函数为:acceptTcpHandler(),用于处理连接请求。当从节点连接主节点时,即从 acceptTcpHandler() 函数开始处理,其调用链如下:
acceptTcpHandler()->acceptCommonHandler()->createClient()
client *createClient(int fd)
/* passing -1 as fd it is possible to create a non connected client.
* This is useful since all the commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
if (fd != -1) {
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
createClient() 函数创建一个读文件事件,事件处理函数为:readQueryFromClient(),用于读取客户端命令。
我们前面提到,在 syncWithMaster() 函数中,从节点向主节点发送 PSYNC 命令,主节点收到 PSYNC 命令后,调用 syncCommand() 函数处理。
void syncCommand(client *c)
/* Try a partial resynchronization if this is a PSYNC command.
* If it fails, we continue with usual full resynchronization, however
* when this happens masterTryPartialResynchronization() already
* replied with:
*
* +FULLRESYNC <replid> <offset>
*
* So the slave knows the new replid and offset to try a PSYNC later
* if the connection with the master is lost. */
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
if (masterTryPartialResynchronization(c) == C_OK) {
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
char *master_replid = c->argv[1]->ptr;
/* Increment stats for failed PSYNCs, but only if the
* replid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
if (master_replid[0] != '?') server.stat_sync_partial_err++;
}
} else {
/* If a slave uses SYNC, we are dealing with an old implementation
* of the replication protocol (like redis-cli --slave). Flag the client
* so that we don't expect to receive REPLCONF ACK feedbacks. */
c->flags |= CLIENT_PRE_PSYNC;
}
/* Full resynchronization. */
server.stat_sync_full++;
/* Setup the slave as one waiting for BGSAVE to start. The following code
* paths will change the state if we handle the slave differently. */
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
if (server.repl_disable_tcp_nodelay)
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
c->repldbfd = -1;
c->flags |= CLIENT_SLAVE;
listAddNodeTail(server.slaves,c);
/* Create the replication backlog if needed. */
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
/* When we create the backlog from scratch, we always use a new
* replication ID and clear the ID2, since there is no valid
* past history. */
changeReplicationId();
clearReplicationId2();
createReplicationBacklog();
}
/* CASE 3: There is no BGSAVE is progress. */
} else {
if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
/* Diskless replication RDB child is created inside
* replicationCron() since we want to delay its start a
* few seconds to wait for more slaves to arrive. */
if (server.repl_diskless_sync_delay)
serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
} else {
/* Target is disk (or the slave is not capable of supporting
* diskless replication) and we don't have a BGSAVE in progress,
* let's start one. */
if (server.aof_child_pid == -1) {
startBgsaveForReplication(c->slave_capa);
} else {
serverLog(LL_NOTICE,
"No BGSAVE in progress, but an AOF rewrite is active. "
"BGSAVE for replication delayed");
}
}
}
syncCommand() 函数,我们也分两部分讨论:
第一部分:masterTryPartialResynchronization() 函数返回 C_ERR(此处不再展开),然后将从节点状态设置为 SLAVE_STATE_WAIT_BGSAVE_START,同时将 c(代表从节点)添加到 server.slaves 列表中。此时,我们可以回答增量同步的第一个问题,即:“server.slaves 列表是何时添加的?”,以及第二个问题的前半部分:“SLAVE_STATE_WAIT_BGSAVE_START 状态是何时设置的?”
第二部分:调用 startBgsaveForReplication() 函数。
int startBgsaveForReplication(int mincapa) {
int retval;
int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
listIter li;
listNode *ln;
serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
socket_target ? "replicas sockets" : "disk");
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
/* Only do rdbSave* when rsiptr is not NULL,
* otherwise slave will miss repl-stream-db. */
if (rsiptr) {
if (socket_target)
retval = rdbSaveToSlavesSockets(rsiptr);
else
retval = rdbSaveBackground(server.rdb_filename,rsiptr);
} else {
serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later.");
retval = C_ERR;
}
/* If we failed to BGSAVE, remove the slaves waiting for a full
* resynchorinization from the list of salves, inform them with
* an error about what happened, close the connection ASAP. */
if (retval == C_ERR) {
serverLog(LL_WARNING,"BGSAVE for replication failed");
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
slave->replstate = REPL_STATE_NONE;
slave->flags &= ~CLIENT_SLAVE;
listDelNode(server.slaves,ln);
addReplyError(slave,
"BGSAVE failed, replication can't continue");
slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
}
return retval;
}
/* If the target is socket, rdbSaveToSlavesSockets() already setup
* the salves for a full resync. Otherwise for disk target do it now.*/
if (!socket_target) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
replicationSetupSlaveForFullResync(slave,
getPsyncInitialOffset());
}
}
}
/* Flush the script cache, since we need that slave differences are
* accumulated without requiring slaves to match our cached scripts. */
if (retval == C_OK) replicationScriptCacheFlush();
return retval;
}
startBgsaveForReplication() 函数先调用 rdbSaveBackground() 函数,然后调用 replicationSetupSlaveForFullResync() 函数。
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
long long start;
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
openChildInfoPipe();
start = ustime();
if ((childpid = fork()) == 0) {
int retval;
/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename,rsi);
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
if (private_dirty) {
serverLog(LL_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
server.child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB);
}
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
server.stat_fork_time = ustime()-start;
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
if (childpid == -1) {
closeChildInfoPipe();
server.lastbgsave_status = C_ERR;
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return C_ERR;
}
serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
server.rdb_save_time_start = time(NULL);
server.rdb_child_pid = childpid;
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
updateDictResizePolicy();
return C_OK;
}
return C_OK; /* unreached */
}
rdbSaveBackground() 函数创建一个子进程,子进程调用 rdbSave() 函数保存内存快照到本地磁盘。
int replicationSetupSlaveForFullResync(client *slave, long long offset) {
char buf[128];
int buflen;
slave->psync_initial_offset = offset;
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
/* We are going to accumulate the incremental changes for this
* slave as well. Set slaveseldb to -1 in order to force to re-emit
* a SELECT statement in the replication stream. */
server.slaveseldb = -1;
/* Don't send this reply to slaves that approached us with
* the old SYNC command. */
if (!(slave->flags & CLIENT_PRE_PSYNC)) {
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
server.replid,offset);
if (write(slave->fd,buf,buflen) != buflen) {
freeClientAsync(slave);
return C_ERR;
}
}
return C_OK;
}
replicationSetupSlaveForFullResync() 函数将从节点状态设置为 SLAVE_STATE_WAIT_BGSAVE_END,同时,向从节点发送响应(即:PSYNC 响应),响应格式为:+FULLRESYNC <replid> <offset>。此时,我们可以回答增量同步的第二个问题的后半部分,即:“SLAVE_STATE_WAIT_BGSAVE_START 状态是何时取消的?”。
至此,syncCommand() 函数已经执行结束了,但是全量同步还没有完成,那么下一步又从哪里开始呢?
在 serverCron() 函数中有如下代码:
/* Check if a background saving or AOF rewrite in progress terminated. */
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren())
{
int statloc;
pid_t pid;
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
if (pid == -1) {
serverLog(LL_WARNING,"wait3() returned an error: %s. "
"rdb_child_pid = %d, aof_child_pid = %d",
strerror(errno),
(int) server.rdb_child_pid,
(int) server.aof_child_pid);
} else if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else {
if (!ldbRemoveChild(pid)) {
serverLog(LL_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
}
updateDictResizePolicy();
closeChildInfoPipe();
}
}
我们上面提到,rdbSaveBackground() 函数创建一个子进程,子进程调用 rdbSave() 函数保存内存快照到本地磁盘,保存完成以后,即被 wait3() 函数捕获,接着调用 backgroundSaveDoneHandler() 函数进行处理。
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
switch(server.rdb_child_type) {
case RDB_CHILD_TYPE_DISK:
backgroundSaveDoneHandlerDisk(exitcode,bysignal);
break;
case RDB_CHILD_TYPE_SOCKET:
backgroundSaveDoneHandlerSocket(exitcode,bysignal);
break;
default:
serverPanic("Unknown RDB child type.");
break;
}
}
backgroundSaveDoneHandler() 函数调用 backgroundSaveDoneHandlerDisk() 函数,backgroundSaveDoneHandlerDisk() 函数调用 updateSlavesWaitingBgsave() 函数。
void updateSlavesWaitingBgsave(int bgsaveerr, int type)
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
...
else {
if (bgsaveerr != C_OK) {
freeClient(slave);
serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error");
continue;
}
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1) {
freeClient(slave);
serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
continue;
}
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = SLAVE_STATE_SEND_BULK;
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
(unsigned long long) slave->repldbsize);
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
continue;
}
}
}
updateSlavesWaitingBgsave() 函数遍历所有的从节点,删除文件事件的写操作,然后创建一个新的写文件事件,事件处理函数为:sendBulkToSlave()。
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
client *slave = privdata;
UNUSED(el);
UNUSED(mask);
char buf[PROTO_IOBUF_LEN];
ssize_t nwritten, buflen;
/* Before sending the RDB file, we send the preamble as configured by the
* replication process. Currently the preamble is just the bulk count of
* the file in the form "$<length>\r\n". */
if (slave->replpreamble) {
nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
if (nwritten == -1) {
serverLog(LL_VERBOSE,"Write error sending RDB preamble to replica: %s",
strerror(errno));
freeClient(slave);
return;
}
server.stat_net_output_bytes += nwritten;
sdsrange(slave->replpreamble,nwritten,-1);
if (sdslen(slave->replpreamble) == 0) {
sdsfree(slave->replpreamble);
slave->replpreamble = NULL;
/* fall through sending data. */
} else {
return;
}
}
/* If the preamble was already transferred, send the RDB bulk data. */
lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN);
if (buflen <= 0) {
serverLog(LL_WARNING,"Read error sending DB to replica: %s",
(buflen == 0) ? "premature EOF" : strerror(errno));
freeClient(slave);
return;
}
if ((nwritten = write(fd,buf,buflen)) == -1) {
if (errno != EAGAIN) {
serverLog(LL_WARNING,"Write error sending DB to replica: %s",
strerror(errno));
freeClient(slave);
}
return;
}
slave->repldboff += nwritten;
server.stat_net_output_bytes += nwritten;
if (slave->repldboff == slave->repldbsize) {
close(slave->repldbfd);
slave->repldbfd = -1;
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
putSlaveOnline(slave);
}
}
sendBulkToSlave() 函数向从节点发送本地磁盘上的内存快照(即 dump.rdb 文件),发送完成以后,删除写文件事件,然后调用 putSlaveOnline() 函数。
/* This function puts a replica in the online state, and should be called just
* after a replica received the RDB file for the initial synchronization, and
* we are finally ready to send the incremental stream of commands.
*
* It does a few things:
*
* 1) Put the slave in ONLINE state. Note that the function may also be called
* for a replicas that are already in ONLINE state, but having the flag
* repl_put_online_on_ack set to true: we still have to install the write
* handler in that case. This function will take care of that.
* 2) Make sure the writable event is re-installed, since calling the SYNC
* command disables it, so that we can accumulate output buffer without
* sending it to the replica.
* 3) Update the count of "good replicas". */
void putSlaveOnline(client *slave) {
slave->replstate = SLAVE_STATE_ONLINE;
slave->repl_put_online_on_ack = 0;
slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
sendReplyToClient, slave) == AE_ERR) {
serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno));
freeClient(slave);
return;
}
refreshGoodSlavesCount();
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
replicationGetSlaveName(slave));
}
putSlaveOnline() 函数将从节点状态设置为 SLAVE_STATE_ONLINE,然后创建一个新的写文件事件,事件处理函数为:sendReplyToClient()。此后,主节点就可以向从节点发送增量同步的数据了。
部分同步
当从节点与主节点断开连接时,从节点会重新连接到主节点,此时,从节点会尝试部分同步。
void replicationCron(void)
/* Timed out master when we are an already connected slave? */
if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
{
serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
freeClient(server.master);
}
从节点与主节点连接中断,调用 freeClient() 函数。
void freeClient(client *c)
/* If it is our master that's beging disconnected we should make sure
* to cache the state to try a partial resynchronization later.
*
* Note that before doing this we make sure that the client is not in
* some unexpected state, by checking its flags. */
if (server.master && c->flags & CLIENT_MASTER) {
serverLog(LL_WARNING,"Connection with master lost.");
if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY|
CLIENT_CLOSE_ASAP|
CLIENT_BLOCKED)))
{
replicationCacheMaster(c);
return;
}
}
void replicationCacheMaster(client *c) {
serverAssert(server.master != NULL && server.cached_master == NULL);
serverLog(LL_NOTICE,"Caching the disconnected master state.");
/* Unlink the client from the server structures. */
unlinkClient(c);
/* Reset the master client so that's ready to accept new commands:
* we want to discard te non processed query buffers and non processed
* offsets, including pending transactions, already populated arguments,
* pending outputs to the master. */
sdsclear(server.master->querybuf);
sdsclear(server.master->pending_querybuf);
server.master->read_reploff = server.master->reploff;
if (c->flags & CLIENT_MULTI) discardTransaction(c);
listEmpty(c->reply);
c->sentlen = 0;
c->reply_bytes = 0;
c->bufpos = 0;
resetClient(c);
/* Save the master. Server.master will be set to null later by
* replicationHandleMasterDisconnection(). */
server.cached_master = server.master;
/* Invalidate the Peer ID cache. */
if (c->peerid) {
sdsfree(c->peerid);
c->peerid = NULL;
}
/* Caching the master happens instead of the actual freeClient() call,
* so make sure to adjust the replication state. This function will
* also set server.master to NULL. */
replicationHandleMasterDisconnection();
}
/* This function is called when the slave lose the connection with the
* master into an unexpected way. */
void replicationHandleMasterDisconnection(void) {
server.master = NULL;
server.repl_state = REPL_STATE_CONNECT;
server.repl_down_since = server.unixtime;
/* We lost connection with our master, don't disconnect slaves yet,
* maybe we'll be able to PSYNC with our master later. We'll disconnect
* the slaves only if we'll have to do a full resync with our master. */
}
freeClient() 函数将从节点的主节点保存到 server.cached_master 变量中,同时将从节点状态设置为 REPL_STATE_CONNECT,这样,从节点就可以重新连接到主节点了。
接下来,就是部分同步过程,不过,由于部分同步和全量同步的执行流程大致相同,且部分同步比全量同步简单得多,这里我们不再分析。
总结
经过前面的分析,我们对同步流程总结如下:
全量同步时,从节点向主节点发送 PSYNC 命令,主节点收到 PSYNC 命令后,调用 syncCommand() 函数处理。syncCommand() 函数创建一个子进程,子进程保存内存快照到本地磁盘,保存完成以后,主节点调用 sendBulkToSlave() 函数将内存快照发送给从节点,然后调用 putSlaveOnline() 函数,将从节点状态设置为 SLAVE_STATE_ONLINE,此后,主节点进入增量同步流程。syncCommand() 函数执行完成以后,主节点会把之后所有的写命令都记录在缓冲区里,增量同步时,主节点将缓冲区里的命令发送给从节点。
另一方面,从节点调用 readSyncBulkPayload() 函数读取主节点发送的内存快照(即 dump.rdb 文件),保存到本地磁盘,保存完成以后,先清空内存,再加载内存快照到内存,完成全量同步,同时,将从节点状态设置为 REPL_STATE_CONNECTED,此后,从节点进入增量同步流程。从节点调用 createClient() 函数创建一个读文件事件,事件处理函数为:readQueryFromClient(),读取主节点发送的命令。
关于全量同步,官方文档有一段描述,摘录如下:
The master starts a background saving process in order to produce an RDB file. At the same time it starts to buffer all new write commands received from the clients. When the background saving is complete, the master transfers the database file to the replica, which saves it on disk, and then loads it into memory. The master will then send all buffered commands to the replica. This is done as a stream of commands and is in the same format of the Redis protocol itself.
缓存淘汰与过期删除
缓存淘汰与过期删除并不是这篇文章的主题,但键的删除必然关系着主从同步的数据一致性。接下来,我们将介绍在主从同步中,键的删除是如何处理的?
缓存淘汰是指当 redis 配置了 maxmemory 指令时,如果内存使用超过了配置的值,则启动缓存淘汰策略,删除一定的键;
过期删除是指键设置了有效期,如果键过期,则删除。过期删除包括:惰性删除和定期删除。
无论是缓存淘汰还是过期删除,从节点都不直接进行相关操作,而是通过同步主节点的操作来实现。接下来,我们详细介绍:
缓存淘汰
int processCommand(client *c)
/* Handle the maxmemory directive.
*
* Note that we do not want to reclaim memory if we are here re-entering
* the event loop since there is a busy Lua script running in timeout
* condition, to avoid mixing the propagation of scripts with the
* propagation of DELs due to eviction. */
if (server.maxmemory && !server.lua_timedout) {
int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
/* freeMemoryIfNeeded may flush slave output buffers. This may result
* into a slave, that may be the active client, to be freed. */
if (server.current_client == NULL) return C_ERR;
/* It was impossible to free enough memory, and the command the client
* is trying to execute is denied during OOM conditions or the client
* is in MULTI/EXEC context? Error. */
if (out_of_memory &&
(c->cmd->flags & CMD_DENYOOM ||
(c->flags & CLIENT_MULTI && c->cmd->proc != execCommand))) {
flagTransaction(c);
addReply(c, shared.oomerr);
return C_OK;
}
}
int freeMemoryIfNeededAndSafe(void) {
if (server.lua_timedout || server.loading) return C_OK;
return freeMemoryIfNeeded();
}
int freeMemoryIfNeeded(void)
/* By default replicas should ignore maxmemory
* and just be masters exact copies. */
if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;
/* Finally remove the selected key. */
if (bestkey) {
db = server.db+bestdbid;
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
/* We compute the amount of memory freed by db*Delete() alone.
* It is possible that actually the memory needed to propagate
* the DEL in AOF and replication link is greater than the one
* we are freeing removing the key, but we can't account for
* that otherwise we would never exit the loop.
*
* AOF and Output buffer memory will be freed eventually so
* we only care about memory used by the key space. */
delta = (long long) zmalloc_used_memory();
latencyStartMonitor(eviction_latency);
if (server.lazyfree_lazy_eviction)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
latencyEndMonitor(eviction_latency);
latencyAddSampleIfNeeded("eviction-del",eviction_latency);
latencyRemoveNestedEvent(latency,eviction_latency);
delta -= (long long) zmalloc_used_memory();
mem_freed += delta;
server.stat_evictedkeys++;
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
keyobj, db->id);
decrRefCount(keyobj);
keys_freed++;
/* When the memory to free starts to be big enough, we may
* start spending so much time here that is impossible to
* deliver data to the slaves fast enough, so we force the
* transmission here inside the loop. */
if (slaves) flushSlavesOutputBuffers();
/* Normally our stop condition is the ability to release
* a fixed, pre-computed amount of memory. However when we
* are deleting objects in another thread, it's better to
* check, from time to time, if we already reached our target
* memory, since the "mem_freed" amount is computed only
* across the dbAsyncDelete() call, while the thread can
* release the memory all the time. */
if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) {
if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
/* Let's satisfy our stop condition. */
mem_freed = mem_tofree;
}
}
}
freeMemoryIfNeeded() 函数中,如果是从节点,则直接返回 C_OK;如果是主节点,则调用 dbAsyncDelete() 或 dbSyncDelete() 函数删除指定的键,同时调用 propagateExpire() 函数:
/* Propagate expires into slaves and the AOF file.
* When a key expires in the master, a DEL operation for this key is sent
* to all the slaves and the AOF file if enabled.
*
* This way the key expiry is centralized in one place, and since both
* AOF and the master->slave link guarantee operation ordering, everything
* will be consistent even if we allow write operations against expiring
* keys. */
void propagateExpire(redisDb *db, robj *key, int lazy) {
robj *argv[2];
argv[0] = lazy ? shared.unlink : shared.del;
argv[1] = key;
incrRefCount(argv[0]);
incrRefCount(argv[1]);
if (server.aof_state != AOF_OFF)
feedAppendOnlyFile(server.delCommand,db->id,argv,2);
replicationFeedSlaves(server.slaves,db->id,argv,2);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
}
propagateExpire() 函数调用 replicationFeedSlaves() 函数向从节点发送 unlink 或 del 命令,对应于主节点调用 dbAsyncDelete() 或 dbSyncDelete() 函数,进行删除操作。
惰性删除
惰性删除是指访问键时,先检查键是否过期,如果过期,则直接删除。
int expireIfNeeded(redisDb *db, robj *key) {
if (!keyIsExpired(db,key)) return 0;
/* If we are running in the context of a slave, instead of
* evicting the expired key from the database, we return ASAP:
* the slave key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys.
*
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
if (server.masterhost != NULL) return 1;
/* Delete the key */
server.stat_expiredkeys++;
propagateExpire(db,key,server.lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);
return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
}
如果是从节点,则返回 1;如果是主节点,则调用 dbAsyncDelete() 或 dbSyncDelete() 函数删除指定的键,同时调用 propagateExpire() 函数向从节点发送 unlink 或 del 命令。
定期删除
定期删除是指周期性(每100毫秒执行一次)的随机检查键是否过期,如果过期,则删除。
void databasesCron(void)
/* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */
if (server.active_expire_enabled) {
if (server.masterhost == NULL) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else {
expireSlaveKeys();
}
}
void beforeSleep(struct aeEventLoop *eventLoop)
/* Run a fast expire cycle (the called function will return
* ASAP if a fast cycle is not needed). */
if (server.active_expire_enabled && server.masterhost == NULL)
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
如果是主节点,调用 activeExpireCycle() 函数处理;如果是从节点,调用 expireSlaveKeys() 函数处理。
int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
long long t = dictGetSignedIntegerVal(de);
if (now > t) {
sds key = dictGetKey(de);
robj *keyobj = createStringObject(key,sdslen(key));
propagateExpire(db,keyobj,server.lazyfree_lazy_expire);
if (server.lazyfree_lazy_expire)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",keyobj,db->id);
decrRefCount(keyobj);
server.stat_expiredkeys++;
return 1;
} else {
return 0;
}
}
activeExpireCycle() 函数调用 activeExpireCycleTryExpire() 函数,activeExpireCycleTryExpire() 函数调用 dbAsyncDelete() 或 dbSyncDelete() 函数删除指定的键,同时调用 propagateExpire() 函数向从节点发送 unlink 或 del 命令。
经过上面的分析,我们可以知道,主节点通过缓存淘汰与过期删除策略删除的键,通过调用 propagateExpire() 函数向从节点发送 unlink 或 del 命令的方式,实现了数据同步。