當(dāng)前位置:首頁 > IT技術(shù) > 其他 > 正文

【Redis】事件驅(qū)動(dòng)框架源碼分析
2022-04-25 22:59:32

aeEventLoop初始化

在server.c文件的initServer函數(shù)中,對(duì)aeEventLoop進(jìn)行了初始化:

  1. 調(diào)用aeCreateEventLoop函數(shù)創(chuàng)建aeEventLoop結(jié)構(gòu)體,對(duì)aeEventLoop結(jié)構(gòu)體中的變量進(jìn)行了初始化,之后調(diào)用了aeApiCreate函數(shù)創(chuàng)建epoll實(shí)例
  2. 調(diào)用aeCreateFileEvent函數(shù)向內(nèi)核注冊(cè)監(jiān)聽事件,由參數(shù)可知,注冊(cè)的是對(duì)TCP文件描述符的可讀事件監(jiān)聽,回調(diào)函數(shù)是acceptTcpHandler,當(dāng)內(nèi)核監(jiān)聽到TCP文件描述符有可讀事件時(shí),Redis將調(diào)用acceptTcpHandler函數(shù)對(duì)事件進(jìn)行處理
  void initServer(void) {
        // 創(chuàng)建aeEventLoop結(jié)構(gòu)體
        server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
        if (server.el == NULL) {
            serverLog(LL_WARNING,
                    "Failed creating the event loop. Error message: '%s'",
                    strerror(errno));
            exit(1);
        }
        // 省略其他代碼...
        for (j = 0; j < server.ipfd_count; j++) {
            // 注冊(cè)監(jiān)聽事件,server.ipfd是TCP文件描述符,AE_READABLE可讀事件,acceptTcpHandler事件處理回調(diào)函數(shù)
            if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
                    acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                        "Unrecoverable error creating server.ipfd file event.");
            }
        }
        // 省略其他代碼...
    }

在aeCreateEventLoop函數(shù)調(diào)用時(shí),傳入的最大文件描述符個(gè)數(shù)為客戶端最大連接數(shù)+宏定義CONFIG_FDSET_INCR的大小,CONFIG_FDSET_INCR的定義在server.h中:

#define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96)
#define CONFIG_MIN_RESERVED_FDS 32

aeEventLoop結(jié)構(gòu)體創(chuàng)建

aeEventLoop結(jié)構(gòu)體定義,在ae.h中:

typedef struct aeEventLoop {
    int maxfd;   /* 記錄最大的文件描述符 */
    int setsize; /* 最大文件描述符個(gè)數(shù) */
    long long timeEventNextId;
    time_t lastTime;     
    aeFileEvent *events; /* IO事件集合,記錄了每個(gè)文件描述符產(chǎn)生事件時(shí)的回調(diào)函數(shù) */
    aeFiredEvent *fired; /* 記錄已觸發(fā)的事件 */
    aeTimeEvent *timeEventHead; /* 時(shí)間事件 */
    int stop;
    void *apidata; /* IO多路復(fù)用API接口相關(guān)數(shù)據(jù) */
    aeBeforeSleepProc *beforesleep;/* 進(jìn)入事件循環(huán)流程前的執(zhí)行函數(shù) */
    aeBeforeSleepProc *aftersleep;/* 退出事件循環(huán)流程后的執(zhí)行函數(shù) */
} aeEventLoop;

aeCreateEventLoop

aeEventLoop結(jié)構(gòu)體創(chuàng)建在aeCreateEventLoop函數(shù)中(ae.c文件):

  1. 分配aeEventLoop結(jié)構(gòu)體所需內(nèi)存
  2. 分配aeEventLoop結(jié)構(gòu)體中其他變量所需內(nèi)存
  3. 調(diào)用aeApiCreate函數(shù)創(chuàng)建epoll實(shí)例
  4. 對(duì)IO事件集合events的mask掩碼初始化為AE_NONE,表示當(dāng)前沒有事件監(jiān)聽
aeEventLoop *aeCreateEventLoop(int setsize) {
    // aeEventLoop結(jié)構(gòu)體
    aeEventLoop *eventLoop;
    int i;
    // 分配eventLoop內(nèi)存
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    // 分配IO事件內(nèi)存
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    // 創(chuàng)建poll實(shí)例
    if (aeApiCreate(eventLoop) == -1) goto err;
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE; // 初始化為空事件
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

創(chuàng)建epoll實(shí)例

aeApiState結(jié)構(gòu)體定義,在ae_epoll.c中:

  • epfd:創(chuàng)建的epoll實(shí)例文件描述符

  • events:記錄文件描述符產(chǎn)生的事件

typedef struct aeApiState {
    int epfd; // epoll實(shí)例文件描述符
    struct epoll_event *events; // 記錄就緒的事件
} aeApiState;

aeApiCreate

epoll實(shí)例的的創(chuàng)建在aeApiCreate函數(shù)(ae_epoll.c文件)中,處理邏輯如下:

  1. 為aeApiState結(jié)構(gòu)體分配內(nèi)存空間

  2. 為aeApiState中的events分配內(nèi)存空間,events數(shù)組個(gè)數(shù)為eventLoop中的最大文件描述個(gè)數(shù)

  3. 調(diào)用epoll_create函數(shù)創(chuàng)建epoll實(shí)例,將返回的epoll文件描述符保存在epfd中

  4. 將eventLoop的apidata指向創(chuàng)建的aeApiState,之后就可以通過eventLoop獲取到epoll實(shí)例并且注冊(cè)監(jiān)聽事件了

static int aeApiCreate(aeEventLoop *eventLoop) {
    // 分配內(nèi)存
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    // 為epoll事件分配內(nèi)存
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    // epoll_create創(chuàng)建epoll實(shí)例,返回文件描述符,保存在state的epfd中
    state->epfd = epoll_create(1024); 
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    // 將aeApiState設(shè)置到eventLoop的apidata
    eventLoop->apidata = state;
    return 0;
}

注冊(cè)事件

IO 事件的數(shù)據(jù)結(jié)構(gòu)是 aeFileEvent 結(jié)構(gòu)體,在ae.c中定義:

  • mask:事件類型掩碼,共有READABLE、WRITABLE、BARRIER三種事件,分別為可讀事件、可寫事件和屏障事件

  • rfileProc:寫事件回調(diào)函數(shù)

  • wfileProc:讀事件回調(diào)函數(shù)

typedef struct aeFileEvent {
    int mask; /* 事件類型掩碼 READABLE|WRITABLE|BARRIER  */
    aeFileProc *rfileProc;  /* 寫事件回調(diào)函數(shù)  */
    aeFileProc *wfileProc;  /* 讀事件回調(diào)函數(shù)  */
    void *clientData; /* 客戶端數(shù)據(jù) */
} aeFileEvent;  

aeCreateFileEvent

aeCreateFileEvent函數(shù)在ae.c文件中,主要處理邏輯如下:

  1. 根據(jù)傳入的文件描述符,在eventLoop中獲取對(duì)應(yīng)的IO事件aeFileEvent fe
  2. 調(diào)用aeApiAddEvent方法注冊(cè)要監(jiān)聽的事件
  3. 設(shè)置讀寫事件的回調(diào)函數(shù)
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    // 根據(jù)傳入的文件描述符獲取對(duì)應(yīng)的IO事件
    aeFileEvent *fe = &eventLoop->events[fd];
    // 注冊(cè)要監(jiān)聽的事件,讓內(nèi)核可以監(jiān)聽到當(dāng)前文件描述符上的IO事件
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc; // 設(shè)置寫事件的回調(diào)函數(shù)
    if (mask & AE_WRITABLE) fe->wfileProc = proc; // 設(shè)置讀事件的回調(diào)函數(shù)
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

aeApiAddEvent

aeApiAddEvent用于注冊(cè)事件(ae_epoll.c文件中):

  1. 從eventLoop獲取aeApiState,因?yàn)閍eApiState中的epfd記錄了epoll實(shí)例
  2. 創(chuàng)建了epoll_event類型的變量ee,用于記錄操作類型、要監(jiān)聽的文件描述符以及事件類型,在調(diào)用函數(shù)時(shí)使用
  3. 根據(jù)掩碼mask判斷操作類型,如果文件描述符還未設(shè)置監(jiān)聽事件mask掩碼為AE_NONE, 類型設(shè)置為添加,否則設(shè)置為修改,操作類型有如下三種:
    • EPOLL_CTL_ADD:用于向epoll添加監(jiān)聽事件
    • EPOLL_CTL_MOD:用于修改已經(jīng)注冊(cè)過的監(jiān)聽事件
    • EPOLL_CTL_ADD:用于刪除監(jiān)聽事件
  4. 將redis的可讀、可寫事件類型轉(zhuǎn)換為epoll的類型,讀事件類型為EPOLLIN,寫事件為EPOLLOUT,并設(shè)置到ee的events中
  5. 調(diào)用epoll_ctl函數(shù)添加文件描述符的監(jiān)聽事件,參數(shù)分別為epoll實(shí)例、操作類型、要監(jiān)聽的文件描述符、epoll_event類型變量ee
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    // 獲取aeApiState
    aeApiState *state = eventLoop->apidata;
    // 創(chuàng)建epoll_event類型的變量ee,添加監(jiān)聽事件的時(shí)候使用
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* 如果fd文件描述符還未設(shè)置監(jiān)聽事件, 類型設(shè)置為添加,否則設(shè)置為修改,簡言之就是根據(jù)掩碼判斷是添加還是修改監(jiān)聽事件 */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    ee.events = 0;
 
    mask |= eventLoop->events[fd].mask; 
    // 如果是可讀事件,轉(zhuǎn)換為epoll的讀事件監(jiān)聽類型EPOLLIN,并設(shè)置到ee的events中
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    // 如果是可寫事件,轉(zhuǎn)換為epoll的寫事件監(jiān)聽類型EPOLLOUT,并設(shè)置到ee的events中
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    // 記錄要監(jiān)聽的文件描述符
    ee.data.fd = fd;
    // 調(diào)用epoll_ctl函數(shù)向epoll添加監(jiān)聽事件,參數(shù)分別為epoll實(shí)例、操作類型、要監(jiān)聽的文件描述符、epoll_event類型變量ee
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

總結(jié)

Redis在啟動(dòng)時(shí),調(diào)用aeCreateEventLoop創(chuàng)建aeEventLoop結(jié)構(gòu)體和epoll實(shí)例,之后調(diào)用aeCreateFileEvent函數(shù)向內(nèi)核注冊(cè)TCP文件描述符的監(jiān)聽事件,當(dāng)有客戶端連接Redis服務(wù)時(shí),TCP文件描述符產(chǎn)生可讀事件,通過epoll可以獲取產(chǎn)生事件的文件描述符,Redis就可以對(duì)連接請(qǐng)求進(jìn)行處理。

// server.el是eventLoop
// server.ipfd[j]是監(jiān)聽socket的文件描述符
// AE_READABLE是讀事件
// acceptTcpHandler是事件產(chǎn)生時(shí)的回調(diào)函數(shù)
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler, NULL)

事件處理

aeMain函數(shù)在ae.c文件中,里面是一個(gè)while循環(huán),它的處理邏輯如下:

  1. 通過eventLoop的stop判斷是否處于停止?fàn)顟B(tài),如果非停止?fàn)顟B(tài)進(jìn)入第2步
  2. 判斷eventLoop的beforesleep是否為空,如果不為空,調(diào)用beforesleep函數(shù)
  3. 調(diào)用了aeProcessEvents函數(shù)處理IO事件
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        // 調(diào)用了aeProcessEvents處理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

aeProcessEvents

aeProcessEvents函數(shù)在ae.c文件中,處理邏輯如下:

  1. 調(diào)用aeApiPoll函數(shù)等待就緒的事件,如果有事件產(chǎn)生,返回就緒的文件描述符個(gè)數(shù),aeApiPoll函數(shù)中對(duì)就緒文件描述符處理時(shí)將其放在了fired中
  2. for循環(huán)中處理就緒的事件,通過fired可以獲取到每一個(gè)產(chǎn)生事件的文件描述符fd,根據(jù)文件描述符fd可以在eventLoop的events中獲取對(duì)應(yīng)的事件aeFileEvent,aeFileEvent中記錄了事件的回調(diào)函數(shù),之后根據(jù)事件類型,調(diào)用對(duì)應(yīng)的回調(diào)函數(shù),調(diào)用回調(diào)函數(shù)的入?yún)⒎謩e為eventLoop、文件描述符、aeFileEvent的clientData、事件類型掩碼
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* 如果沒有事件 */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* 如果有IO事件或者時(shí)間事件 */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;
        
        // 省略代碼...
        
        // 等待事件,返回就緒文件描述符的數(shù)量
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        // 處理就緒的事件
        for (j = 0; j < numevents; j++) {
            // aeApiPoll中已將就緒的事件放在了fired中,通過fired可以獲取到產(chǎn)生事件的文件描述符fd
            // 根據(jù)文件描述符fd獲取對(duì)應(yīng)的事件aeFileEvent,aeFileEvent中記錄了事件的回調(diào)函數(shù)
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            // 獲取文件描述符
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            /* 判斷屏障 */
            int invert = fe->mask & AE_BARRIER;
            
            /* 處理可讀事件 */
            if (!invert && fe->mask & mask & AE_READABLE) {
                // 如果是可讀事件,調(diào)用可讀事件的回調(diào)函數(shù),參數(shù)分別為eventLoop、文件描述符、aeFileEvent的clientData、事件類型掩碼
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }

            /* 處理可寫事件 */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    // 如果是寫事件,調(diào)用寫事件的回調(diào)函數(shù),參數(shù)分別為eventLoop、文件描述符、aeFileEvent的clientData、事件類型掩碼
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    /* 如果有時(shí)間事件 */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

aeApiPoll

aeApiPoll處理就緒的事件:

  1. 調(diào)用IO多路復(fù)用epoll_wait函數(shù)等待事件的產(chǎn)生,epoll_wait函數(shù)需要傳入epoll實(shí)例、記錄就緒事件集合的epoll_event,這兩個(gè)參數(shù)分別在aeApiState的epfd和events中,當(dāng)監(jiān)聽的文件描述符有事件產(chǎn)生時(shí),epoll_wait返回就緒的文件描述符個(gè)數(shù)

  2. 對(duì)epoll_wait返回的就緒事件進(jìn)行處理,事件記錄在events變量中,遍歷每一個(gè)就緒的事件,將事件對(duì)應(yīng)的文件描述符設(shè)置在eventLoop的fire中,后續(xù)通過fire對(duì)事件進(jìn)行處理

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    // 獲取aeApiState,aeApiState記錄了epoll實(shí)例,events記錄了產(chǎn)生的事件
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    // 等待事件的產(chǎn)生,epoll_wait返回就緒的文件描述符個(gè)數(shù),就緒的事件記錄在state->events中
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        // 處理返回的就緒事件
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            // 獲取每一個(gè)就緒的事件
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            // 將就緒事件的文件描述符設(shè)置到已觸發(fā)的事件fired的fd中
            eventLoop->fired[j].fd = e->data.fd;
            // 設(shè)置事件類型掩碼
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

處理客戶端連接

acceptTcpHandler

由上面的調(diào)用可知,Redis在啟動(dòng)時(shí),注冊(cè)了AE_READABLE讀事件,回調(diào)函數(shù)為acceptTcpHandler(network.c文件中)用于處理客戶端連接,當(dāng)有客戶端與Redis連接時(shí),epoll返回就緒的文件描述符,Redis在處理就緒的事件時(shí)調(diào)用acceptTcpHandler進(jìn)行處理:

  1. 調(diào)用anetTcpAccept建立連接,并返回已連接的套接字文件描述符cfd
  2. 調(diào)用acceptCommonHandler(network.c文件中)函數(shù),它又調(diào)用了createClient函數(shù),在createClient函數(shù)中調(diào)用了aeCreateFileEvent,向內(nèi)核注冊(cè)已連接套接字的可讀監(jiān)聽事件
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);
    while(max--) {
        // 建立連接,返回已連接的套接字文件描述符cfd
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        // 調(diào)用acceptCommonHandler處理連接,這里傳入的文件描述符為已連接的套接字
        acceptCommonHandler(cfd,0,cip);
    }
}

static void acceptCommonHandler(int fd, int flags, char *ip) {
    client *c;
    // 調(diào)用createClient
    if ((c = createClient(fd)) == NULL) {
        serverLog(LL_WARNING,
            "Error registering fd event for the new client: %s (fd=%d)",
            strerror(errno),fd);
        close(fd); /* May be already closed, just ignore errors */
        return;
    }
    // ..
}

createClient

createClient函數(shù)中調(diào)用了aeCreateFileEvent方法向內(nèi)核中注冊(cè)可讀事件,上文可知傳入的描述符是已連接套接字cfd,回調(diào)函數(shù)為readQueryFromClient,此時(shí)事件驅(qū)動(dòng)框架增加了對(duì)客戶端已連接套接字的監(jiān)聽,當(dāng)客戶端有數(shù)據(jù)發(fā)送到服務(wù)端時(shí),Redis調(diào)用readQueryFromClient函數(shù)處理讀事件:

client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));
    if (fd != -1) {
        anetNonBlock(NULL,fd);
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        // 注冊(cè)已連接套接字的可讀事件,回調(diào)函數(shù)為readQueryFromClient
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }
    // ...
}

處理讀事件

readQueryFromClient

readQueryFromClient函數(shù)在network.c文件中,是可讀事件的回調(diào)函數(shù),用于處理已連接套接字上的讀事件,處理邏輯如下:

  1. 從已連接的套接字中讀取客戶端的請(qǐng)求數(shù)據(jù)到輸入緩沖區(qū)
  2. 調(diào)用processInputBufferAndReplicate函數(shù)處理輸入緩沖區(qū)的數(shù)據(jù)
// aeProcessEvents中調(diào)用回調(diào)函數(shù)時(shí),傳入的參數(shù)分別為aeEventLoop、已連接套接字的文件描述符、aeFileEvent的clientData私有數(shù)據(jù)、事件類型掩碼
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *c = (client*) privdata;
    int nread, readlen;
    size_t qblen;
    UNUSED(el);
    UNUSED(mask);

    readlen = PROTO_IOBUF_LEN;
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
        if (remaining > 0 && remaining < readlen) readlen = remaining;
    }

    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    // 從已連接的套接字中讀取客戶端的請(qǐng)求數(shù)據(jù)到輸入緩沖區(qū)
    nread = read(fd, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (errno == EAGAIN) {
            return;
        } else {
            serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
        }
    } else if (nread == 0) {
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    } else if (c->flags & CLIENT_MASTER) {
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }
    // 省略...

    /* 處理輸入緩沖區(qū)數(shù)據(jù) */
    processInputBufferAndReplicate(c);
}

處理寫事件

在aeMain調(diào)用aeProcessEvents之前,先調(diào)用了beforeSleep方法,beforeSleep中又調(diào)用了handleClientsWithPendingWrites,它會(huì)將Redis Server緩沖區(qū)的數(shù)據(jù)寫回到客戶端:

void beforeSleep(struct aeEventLoop *eventLoop) {
    
    // 省略...

    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWrites();

    // 省略...
}.

handleClientsWithPendingWrites

Redis Server收到客戶端的請(qǐng)求命令后,需要處理請(qǐng)求,然后將要返回的數(shù)據(jù)寫回到客戶端,寫回到客戶端的邏輯在handleClientsWithPendingWrites函數(shù)中,處理邏輯如下:

  1. 獲取待寫回?cái)?shù)據(jù)的客戶端列表
  2. 遍歷每一個(gè)待寫回?cái)?shù)據(jù)的客戶端,調(diào)用writeToClient方法將緩沖區(qū)的數(shù)據(jù)寫到客戶端socket中,然后調(diào)用clientHasPendingReplies方法判斷數(shù)據(jù)是否全部寫回,如果為否,則調(diào)用aeCreateFileEvent向內(nèi)核注冊(cè)客戶端文件描述符的可寫事件監(jiān)聽,交由回調(diào)函數(shù)sendReplyToClient處理
int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    int processed = listLength(server.clients_pending_write);
    // 獲取待寫回?cái)?shù)據(jù)的客戶端列表
    listRewind(server.clients_pending_write,&li);
    // 遍歷每一個(gè)待寫回?cái)?shù)據(jù)的客戶端
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        listDelNode(server.clients_pending_write,ln);
        if (c->flags & CLIENT_PROTECTED) continue;

        /* 將緩沖區(qū)的數(shù)據(jù)寫到客戶端socket中 */
        if (writeToClient(c->fd,c,0) == C_ERR) continue;

        /* 如果數(shù)據(jù)未全部寫回到客戶端 */
        if (clientHasPendingReplies(c)) {
            int ae_flags = AE_WRITABLE;
            if (server.aof_state == AOF_ON &&
                server.aof_fsync == AOF_FSYNC_ALWAYS)
            {
                ae_flags |= AE_BARRIER;
            }
            // 調(diào)用aeCreateFileEvent方法,向內(nèi)核注冊(cè)客戶端文件描述符的可寫事件監(jiān)聽,交由回調(diào)函數(shù)sendReplyToClient處理
            if (aeCreateFileEvent(server.el, c->fd, ae_flags,
                sendReplyToClient, c) == AE_ERR)
            {
                    freeClientAsync(c);
            }
        }
    }
    return processed;
}

clientHasPendingReplies

有時(shí)由于網(wǎng)絡(luò)原因或者其他原因,可能只發(fā)出去了部分?jǐn)?shù)據(jù),客戶端如果一直未從緩沖區(qū)讀取數(shù)據(jù),在緩沖區(qū)已滿的情況,服務(wù)端將無法往客戶端發(fā)送數(shù)據(jù),所以調(diào)用clientHasPendingReplies函數(shù)判斷數(shù)據(jù)是否寫回完畢,如果未寫回完畢交由事件循環(huán)驅(qū)動(dòng)處理,提高處理效率。

整體流程圖

總結(jié)

參考

極客時(shí)間 - Redis源碼剖析與實(shí)戰(zhàn)(蔣德鈞)

【osc_avxkth26】Redis 網(wǎng)絡(luò)通信模塊源碼分析(3)

網(wǎng)絡(luò)通信 --> epoll用法

Redis版本:redis-5.0.8

本文摘自 :https://www.cnblogs.com/

開通會(huì)員,享受整站包年服務(wù)立即開通 >