vlambda博客
学习文章列表

redis0.1源码解析之基本原理

本文分析redis的基础原理,暂不做深入分析,后续再详细分析。我们从main函数开始。

int main(int argc, char **argv) {
    initServerConfig();
    initServer();
    aeCreateFileEvent(server.el, server.fd, AE_READABLE, acceptHandler, NULLNULL)
    aeMain(server.el);
    return 0;
}

下面就四个函数进行分析。

1 initServerConfig

static void initServerConfig() {
    server.dbnum = REDIS_DEFAULT_DBNUM;
    server.port = REDIS_SERVERPORT;
    server.verbosity = REDIS_DEBUG;
    server.maxidletime = REDIS_MAXIDLETIME;
    server.saveparams = NULL;
    server.logfile = NULL/* NULL = log on standard output */
    server.bindaddr = NULL;
    server.glueoutputbuf = 1;
    server.daemonize = 0;
    server.pidfile = "/var/run/redis.pid";
    server.dbfilename = "dump.rdb";
    server.requirepass = NULL;
    server.shareobjects = 0;
    server.maxclients = 0;
    ResetServerSaveParams();

    appendServerSaveParams(60*60,1);  /* save after 1 hour and 1 change */
    appendServerSaveParams(300,100);  /* save after 5 minutes and 100 changes */
    appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
    /* Replication related */
    server.isslave = 0;
    server.masterhost = NULL;
    server.masterport = 6379;
    server.master = NULL;
    server.replstate = REDIS_REPL_NONE;
}

该函数是对redis核心结构体server(struct redisServer server)进行初始化。

2 initServer

static void initServer() {
    int j;
    // 修改这两个信号的处理函数为忽略,即不处理,如果不设置但又收到这个信号,会导致进程退出
    signal(SIGHUP, SIG_IGN);
    signal(SIGPIPE, SIG_IGN);
    // clients列表,每次收到请求的时候会初始化一个client
    server.clients = listCreate();
    server.slaves = listCreate();
    server.monitors = listCreate();
    server.objfreelist = listCreate();
    createSharedObjects();
    // 创建一个事件循环的结构体
    server.el = aeCreateEventLoop();
    server.db = zmalloc(sizeof(redisDb)*server.dbnum);
    server.sharingpool = dictCreate(&setDictType,NULL);
    server.sharingpoolsize = 1024;
     // 启动服务器
    server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);

    for (j = 0; j < server.dbnum; j++) {
        server.db[j].dict = dictCreate(&hashDictType,NULL);
        server.db[j].expires = dictCreate(&setDictType,NULL);
        server.db[j].id = j;
    }
    server.cronloops = 0;
    server.bgsaveinprogress = 0;
    server.lastsave = time(NULL);
    server.dirty = 0;
    server.usedmemory = 0;
    server.stat_numcommands = 0;
    server.stat_numconnections = 0;
    server.stat_starttime = time(NULL);
    // 增加一个定时器
    aeCreateTimeEvent(server.el, 1000, serverCron, NULLNULL);
}

我们主要看initServer函数中的anetTcpServer这个函数。

int anetTcpServer(char *err, int port, char *bindaddr)
{
    int s, on = 1;
    struct sockaddr_in sa;

    if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        anetSetError(err, "socket: %s\n", strerror(errno));
        return ANET_ERR;
    }
    if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
        anetSetError(err, "setsockopt SO_REUSEADDR: %s\n", strerror(errno));
        close(s);
        return ANET_ERR;
    }
    memset(&sa,0,sizeof(sa));
    sa.sin_family = AF_INET;
    sa.sin_port = htons(port);
    sa.sin_addr.s_addr = htonl(INADDR_ANY);
    if (bindaddr) {
        if (inet_aton(bindaddr, &sa.sin_addr) == 0) {
            anetSetError(err, "Invalid bind address\n");
            close(s);
            return ANET_ERR;
        }
    }
    if (bind(s, (struct sockaddr*)&sa, sizeof(sa)) == -1) {
        anetSetError(err, "bind: %s\n", strerror(errno));
        close(s);
        return ANET_ERR;
    }
    if (listen(s, 64) == -1) {
        anetSetError(err, "listen: %s\n", strerror(errno));
        close(s);
        return ANET_ERR;
    }
    return s;
}

这个函数的逻辑就是经典的网络socket编程流程。最后通过listen函数,启动服务器。。

3 aeCreateFileEvent(server.el, server.fd, AE_READABLE, acceptHandler, NULL, NULL)

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)

{
    aeFileEvent *fe;

    fe = zmalloc(sizeof(*fe));
    if (fe == NULLreturn AE_ERR;
    fe->fd = fd;
    fe->mask = mask;
    fe->fileProc = proc;
    fe->finalizerProc = finalizerProc;
    fe->clientData = clientData;
    fe->next = eventLoop->fileEventHead;
    eventLoop->fileEventHead = fe;
    return AE_OK;
}

aeCreateFileEvent函数在事件驱动那篇文章分析过,他就封装一个结构体,然后在事件循环的时候注册到事件驱动模块(本版本是使用select函数)。这里是注册可读事件,回调函数是acceptHandler。

4 aeMain

void aeMain(aeEventLoop *eventLoop)
{
    eventLoop->stop = 0;
    while (!eventLoop->stop)
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}

该函数在事件驱动文章也分析过,就是注册事件到事件驱动模块,然后开始等待事件发生,接着处理回调,根据3的分析,当有连接到来,会执行acceptHandler。

static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    char cip[128];
    redisClient *c;
    // 调用accept摘下一个请求
    cfd = anetAccept(server.neterr, fd, cip, &cport);
    // 创建一个client
    if ((c = createClient(cfd)) == NULL) {
        //
    }
}

该函数摘下一个请求的节点,然后创建一个client处理该请求。

static redisClient *createClient(int fd) {
    redisClient *c = zmalloc(sizeof(*c));

    anetNonBlock(NULL,fd);
    anetTcpNoDelay(NULL,fd);
    if (!c) return NULL;
    selectDb(c,0);
    c->fd = fd;
    c->querybuf = sdsempty();
    c->argc = 0;
    c->argv = NULL;
    c->bulklen = -1;
    c->sentlen = 0;
    c->flags = 0;
    c->lastinteraction = time(NULL);
    c->authenticated = 0;
    c->replstate = REDIS_REPL_NONE;
    if ((c->reply = listCreate()) == NULL) oom("listCreate");
    listSetFreeMethod(c->reply,decrRefCount);
    listSetDupMethod(c->reply,dupClientReplyValue);
    if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
        readQueryFromClient, c, NULL) == AE_ERR) {
        freeClient(c);
        return NULL;
    }
    if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
    return c;
}

我们接着看aeCreateFileEvent(server.el, c->fd, AE_READABLE,readQueryFromClient, c, NULL)这一句,在accept的fd(和客户端通信的fd)上注册可读事件(即等待客户端发送数据过来),然后执行readQueryFromClient。readQueryFromClient函数主要是读取客户端发送过来的数据,然后调processCommand。

static int processCommand(redisClient *c) {
    struct redisCommand *cmd;
    long long dirty;
    // 退出命令
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
        freeClient(c);
        return 0;
    }
    // 根据客户端的命令找到对应的命令配置
    cmd = lookupCommand(c->argv[0]->ptr);
    // ...
    // 执行对应的处理函数
    cmd->proc(c);
    return 1;
}

上面函数根据用户发送的命令执行对应的函数。redis的命令配置表为。

static struct redisCommand cmdTable[] = {
    {"get",getCommand,2,REDIS_CMD_INLINE},
    ...
};

假设客户端发送的是get命令,则执行的函数为getCommand。

static void getCommand(redisClient *c) {
    // 读取客户端发送的数据
    robj *o = lookupKeyRead(c->db,c->argv[1]);
    addReply(c,o);
}

static void addReply(redisClient *c, robj *obj) {
    aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c, NULL)
}

给客户端fd注册可写事件,准备给客户端回复信息。事件触发时执行的函数是sendReplyToClient。

static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *c = privdata;
    //...
    nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
}

发送回包给客户端。
这就是redis启动到处理客户端信息的整体流程。