Reactor網路模型核心思想探秘

2023.12.11

Reactor網路模型核心思想探秘

reactor網路模型是網路程式設計中非常重要的一種程式設計思想,本文透過一個簡短的範例試圖講明白reactor網路程式設計模型的核心思想。當然,本文的實作還不是很完善,例如在呼叫回呼函數的時候還是傳入了fd,我們是否可以不需要這個參數,徹底和IO進行分離呢?

在網路程式設計系列文章中,我們實作了一個基於epoll的網路框架,並在此基礎上開發了一個簡單的HTTP服務,在那個系列文章中我們使用了讀、寫兩個buffer將網路IO和資料的讀寫進行了分離,它們之間的扭轉完全透過epoll事件通知,如果你認真研究過源碼,會發現,所有針對網路IO的操作都是由事件觸發的。這種基於事件觸發的網路模型通常我們叫做Reactor網路模型。

由於網路程式設計系列文章中程式碼實現相對比較複雜,所以不太好講清楚。所以,我決定單獨出幾篇文章對那個系列文章進行一些拓展,主要涉及網路程式設計思想和效能測試。

這篇文章我們透過實作一個簡單的網路框架,來說明Reactor網路模型實現的一般思路,其本質思想和x-net專案基本上是一樣的,只是在程式碼上做了非常大的精簡,理解起來會輕鬆很多。

首先,我們來看一段程式碼

#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>




int main() {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);


    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(struct sockaddr_in));


    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(2048);


    if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {
        perror("bind fail");
        return -1;
    }


    listen(sockfd, 10);


    printf("sock-fd:%d\n", sockfd);


    int epfd = epoll_create(1);


    struct epoll_event ev;
    ev.events = EPOLLIN;
    ev.data.fd = sockfd;


    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);


    struct epoll_event events[1024] = {0};


    while(1) {
        int nready = epoll_wait(epfd, events, 1024, -1);


        int i = 0;
        for (i = 0; i < nready; i++) {
            int connfd = events[i].data.fd;
            if (events[i].events & EPOLLIN && sockfd == connfd) {
                struct sockaddr_in clientaddr;
                socklen_t len = sizeof(clientaddr);


                int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);


                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = clientfd;
                epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);


                printf("clientfd: %d\n", clientfd);
            } else if (events[i].events & EPOLLIN) {


                char buffer[10] = {0};


                int count = recv(connfd, buffer, 10, 0);
                if (count == 0) {
                    printf("discounnect\n");


                    epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL);
                    close(i);


                    continue;
                }


                send(connfd, buffer, count, 0);
                printf("clientfd: %d, count: %d, buffer: %s\n", connfd, count, buffer);
            }
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.

熟悉epoll的人應該對上面的程式碼比較熟悉,這段程式碼的核心在下面的while主循環,如果是當前Server的Socket說明有新的連接進來,調用accept拿到客戶端的fd,將其放在epoll的events中,並註冊EPOLLIN事件,一般我們理解為可讀事件。

如果不是sockfd,表示是客戶端的fd可讀,我們將資料讀出來再原樣發送回去。

上面的程式碼存在的主要問題在於,套接字的accept和讀寫操作我們是直接寫在主循環裡了,這將會讓程式碼的邏輯變得難以琢磨。

對於一個套接字,最直接的操作就是讀寫。所以,最容易想到的就是將讀和寫分開離開。為了實現讀取和寫入分離我們封裝兩個回調函數,如下:

int recv_callback(int fd, char *buffer, int size);
int send_callback(int fd, char *buffer, int size);
  • 1.
  • 2.

你可以想一下,這兩個函數該怎麼寫?以下是根據原有的邏輯將讀取和寫入封裝在了recv_callback和send_callback兩個函數中,程式碼如下:

int recv_callback(int fd, char *buffer, int size) {
    int count = recv(fd, buffer, size, 0);


    send_callback(fd, buffer, count, 0);


    return count;
}
int send_callback(int fd, char *buffer, int size) {
    int count = send(fd, buffer, size, 0);


    return count;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.

然後,在主循環中就可以這樣使用

int main() {


    ...


    while(1) {
        int nready = epoll_wait(epfd, events, 1024, -1);


        int i = 0;
        for (i = 0; i < nready; i++) {
            int connfd = events[i].data.fd;


            if (events[i].events & EPOLLIN && sockfd == connfd) {
                ...
            } else if (events[i].events & EPOLLIN) {
                char buffer[10] = {0};


                int count = recv_callback(fd, buffer, 10);
                if (count == 0) {
                    printf("disconnect\\n");
                    epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL);
                    clise(i);
                    continue;
                }
            }
        }
    } 
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.

雖然我們將讀取和寫入拆成了兩個方法,但讀和寫並沒有分離開,我們在recv_callback中每次收到資料之後調用send_callback將資料原樣又發回去,在這裡我們希望recv_callback和send_callback各管各的互不干擾,例如像下面這樣

int recv_callback(int fd, char *buffer, int size) {
    int count = recv(fd, buffer, size, 0);


    return count;
}
int send_callback(int fd, char *buffer, int size) {
    int count = send(fd, buffer, size, 0);


    return count;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

但這明顯也是有問題的,在recv_callback中讀完了之後,要如何傳送資料呢?這裡,我們可以想一下,圍繞著一個套接字有哪些部分呢?是不是可以設​​計出一個類似字典的結構,這個字典的key對應的就是套接字,而value對應的就是圍繞套接字相關的各個組件。

我們將recv_callback和send_callback放在了一個conn_channel結構體中,並且設計了兩個buffer,一個用來讀數據,另一個用來發數據,conn_channel便是這個字典對應的value,代碼如下:

#define BUF_LEN   1024


typedef int(*callback)(int fd);


struct conn_channel {
    int fd;


    callback recv_call;
    callback send_call;


    char wbuf[BUF_LEN];
    int wlen;
    char rbuf[BUF_LEN];
    int rlen;
};
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.

其中,fd表示的是目前客戶端套接字。然後我們定義一個陣列來表示套接字到套接字value的映射關係,程式碼如下:

struct conn_channel conn_map[1024] = {0};
  • 1.

這樣,我們在主循環中,就可以像下面這樣,往conn_map中加入對應的套接字了,程式碼如下:

int main() {
    ...


    while(1) {
        int nready = epoll_wait(epfd, events, 1024, -1);


        int i = 0;
        for (i = 0; i < nready; i++) {
            int connfd = events[i].data.fd;


            if (events[i].events & EPOLLIN && sockfd == connfd) {
                struct sockaddr_in clientaddr;
                socklen_t len = sizeof(clientaddr);


                int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);


                ev.events = EPOLLIN;
                ev.data.fd = clientaddr;


                epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);


                conn_map[clientfd].fd = clientfd;
                conn_map[clientfd].rlen = 0;
                conn_map[clientfd].wlen = 0;
                conn_map[clientfd].recv_call = recv_callback;
                conn_map[clientfd].send_call = send_callback;
                memset(conn_map[clientfd].rbuf, 0, BUF_LEN);
                memset(conn_map[clientfd].wbuf, 0, BUF_LEN);


                printf("clientfd:%d\\n", clientfd);
            } else if (events[i].events & EPOLLIN) {
                ...
            }
        }
    } 
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.

在上面的程式碼中,每當accept出來一個客戶端的套接字,我們就將它放到conn_map中,設定好讀寫buffer和回呼函數。但如果你細心會發現,recv_callback、send_callback和conn_channel中的回呼函數簽章是不一樣的。所以,我們要調整這兩個函數的實現,調整之後程式碼如下:

int recv_callback(int fd) {
    int count = recv(fd, conn_map[fd].rbuf + conn_map[fd].rlen, BUF_LEN - conn_map[fd].rlen, 0);
    // do something


    memcpy(conn_map[fd].wbuf, conn_map[fd].rbuf, conn_map[fd].rlen);
    conn_map[fd].wlen = conn_map[fd].rlen;
    conn_map[fd].rlen = 0;


    return count;
}
int send_callback(int fd) {
    int count = send(fd, conn_map[fd].wbuffer, conn_map[fd].wlen, 0);


    return count;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.

因為有了conn_map,所以原來傳進來的buffer和size都不需要了,在conn_channel中已經有記錄了。所以只需要一個fd參數就可以了。我們在recv_callback中模擬了回覆訊息,強行將讀到的資料寫到了wbuffer中。這裡補充一下,conn_channel中的rbuffer是用來從套接字中讀取資料的,wbuffer表示的是將要傳送到套接字的資料。

你可以試著把上面的程式碼跑起來,然後你會發現,並沒有照我們的預期執行,send_callback中的send似乎沒有運作。這是因為我們只是將資料從rbuffer寫到了wbuffer中,而send_callback並沒有機會呼叫。你可以想想send_callback放在哪裡呼叫比較合適呢?

在上面的例子中,顯然放在主循環中執行比較合適,在epoll中,EPOLLOUT表示可寫事件,我們可以利用這個事件。在recv_callback執行完之後我們註冊一個EPOLLOUT事件,然後在主循環中我們去監聽EPOLLOUT事件。這樣,當recv_callback將rbuffer的資料複製到wbuffer中之後,send_callback透過EPOLLOUT事件就可以在主循環中得以執行。

為了實現上面的效果我們要修改兩個地方,一個是recv_callback中我們要註冊一下EPOLLOUT事件,程式碼如下:

int recv_callback(int fd) {
    int count = recv(fd, conn_map[fd].rbuf + conn_map[fd].rlen, BUF_LEN - conn_map[fd].rlen, 0);
    // do something


    memcpy(conn_map[fd].wbuf, conn_map[fd].rbuf, conn_map[fd].rlen);
    conn_map[fd].wlen = conn_map[fd].rlen;
    conn_map[fd].rlen = 0;


    struct epoll_event ev;
    ev.events = EPOLLOUT;
    ev.data.fd = fd;


    epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);


    return count;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.

我們在rbuf拷貝到wbuf之後,給當前fd註冊了EPOLLOUT事件,然後我們在主循環中要處理EPOLLOUT事件,代碼如下:

int main() {
    ...


    while(1) {
        int nready = epoll_wait(epfd, events, 1024, -1);


        int i = 0;
        for (i = 0; i < nready; i++) {
            int connfd = events[i].data.fd;


            if (events[i].events & EPOLLIN && sockfd == connfd) {
                struct sockaddr_in clientaddr;
                socklen_t len = sizeof(clientaddr);


                int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);


                ev.events = EPOLLIN;
                ev.data.fd = clientaddr;


                epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);


                conn_map[clientfd].fd = clientfd;
                conn_map[clientfd].rlen = 0;
                conn_map[clientfd].wlen = 0;
                conn_map[clientfd].recv_call = recv_callback;
                conn_map[clientfd].send_call = send_callback;
                memset(conn_map[clientfd].rbuf, 0, BUF_LEN);
                memset(conn_map[clientfd].wbuf, 0, BUF_LEN);


                printf("clientfd:%d\\n", clientfd);
            } else if (events[i].events & EPOLLIN) {
                int count = conn_map[connfd].recv_call(connfd);
                printf("recv-count:%d\\n", count);
            } else if (events[i].events & EPOLLOUT) { // 处理EPOLLOUT事件
                int count  = conn_map[connfd].send_call(connfd);
                printf("send-count:%d\\n", count);
            }
        }
    } 
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.

要注意的是,epfd是在main函數中定義的,而我們在recv_callback中有使用,所以我們可以暫時將epfd宣告成一個全域變量,放在外面。

上面的程式碼有一個問題,EPOLLOUT事件觸發之後你會發現再向當前fd發送數據,就沒響應了,這是因為epoll事件被我們修改了,為了解決這個問題我們可以在send_callback執行完之後再設置回去,如下:

int send_callback(int fd) {
    int count = send(fd, conn_map[fd].wbuffer, conn_map[fd].wlen, 0);


    struct epoll_event ev;
    ev.events = EPOLLIN;
    ev.data.fd = fd;


    epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);


    return count;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.

這樣,我們就將IO操作給屏蔽了,在主循環中我們只專注於事件,不同的事件呼叫不同的回呼函數。在對應的回呼函數中只做自己該做的,做完後註冊事件通知其它的回呼函數。

但是,上面的程式碼還不夠優雅,對於accept和讀取事件來講在epoll中都是EPOLLIN事件,這兩個是不是可以合併在一起處理呢?答案是可以的,首先,我們要將accept相關的邏輯給拆出來,拆解之後的程式碼如下:

int accept_callback(int fd) {
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);


    int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);


    ev.events = EPOLLIN;
    ev.data.fd = clientaddr;


    epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);


    conn_map[clientfd].fd = clientfd;
    conn_map[clientfd].rlen = 0;
    conn_map[clientfd].wlen = 0;
    conn_map[clientfd].recv_call = recv_callback;
    conn_map[clientfd].send_call = send_callback;
    memset(conn_map[clientfd].rbuf, 0, BUF_LEN);
    memset(conn_map[clientfd].wbuf, 0, BUF_LEN);


    return clientfd;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.

我們發現,accept_callback和recv_callback以及send_callback的簽名是一樣的,這樣我們可以在conn_channel用一個union,將accept_callback也放到conn_channel中來。如下:

struct conn_channel {
    int fd;


    union {
        callback accept_call;
        callback recv_call;
    } call_t;
    callback send_call;


    char wbuf[BUF_LEN];
    int wlen;
    char rbuf[BUF_LEN];
    int rlen;
};
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.

在主循環中,我們就可以先給sockfd註冊好accept回呼函數,然後我們只需要在主循環中保留兩個邏輯就可以了,程式碼如下:

int main() {
    int sockfd = create_serv(9000);
    if (sockfd == -1) {
        perror("create-server-fail");
        return -1;
    }


    make_nonblocking(sockfd);


    epfd = epoll_create1(1);


    struct epoll_event ev;
    ev.events = EPOLLIN;
    ev.data.fd = sockfd;


    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);


    struct epoll_event events[1024] = {0}; 


    conn_map[sockfd].rlen = 0;
    conn_map[sockfd].wlen = 0;
    conn_map[sockfd].fd = sockfd;
    conn_map[sockfd].call_t.accept_call = accept_callback;
    conn_map[sodkfd].send_call = send_callback;
    memset(conn_map[sockfd].rbuf, 0, BUF_LEN);
    memset(conn_map[sockfd].wbuf, 0, BUF_LEN);


    while(1) {
        int nready = epoll_wait(epfd, events, 1024, -1);


        int i = 0;
        for (i = 0; i < nready; i++) {
            int connfd = events[i].data.fd;
            if (events[i].events & EPOLLIN) {
                int count = conn_map[connfd].call_t.recv_call(connfd);
                printf("recv-count:%d\\n", count);
            } else if (events[i].events & EPOLLOUT) {
                int count  = conn_map[connfd].send_call(connfd);
                printf("send-count:%d\\n", count);
            }
        }
    } 
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.

你可以想一下,我們註冊的是call_t.accept_call,但在呼叫的時候確是call_t.recv_call,為什麼這樣可行?

我們在網頁程式設計系列文章中,單獨為accept抽象化了一個對象,你可以比較一下這兩種實作方式,看看它們有什麼不同?在系列文件中我們為什麼要單獨抽象化一個accepter物件呢?

可以看到,最後主循環中的邏輯,只有兩個分支,這兩個分支代表了兩種事件,這種透過事件驅動的網路模型就是Reactor網路模型。本文為了容易理解,程式碼進行了精簡。在實際的工程中我們還要考慮許多情況。例如,上面的程式碼只支援epoll,我們是不是可以將事件驅動相關的程式碼抽象化成單獨的元件,讓其可以支援其它的事件模型。

本文雖然程式碼簡單,但Reactor網路模型的實作基本上都逃脫不了這個套路,只是在此基礎上可能會將各個部分進行單獨的封裝,例如我們在網路程式設計系列文章中就將channel和map進行了抽象,讓它能適配各種場景。

總結

reactor網路模型是網路程式設計中非常重要的一種程式設計思想,本文透過一個簡短的範例試圖講明白reactor網路程式設計模型的核心思想。當然,本文的實作還不是很完善,例如在呼叫回呼函數的時候還是傳入了fd,我們是否可以不需要這個參數,徹底和IO進行分離呢?