Exploring the core ideas of the Reactor network model

2023.12.11

Exploring the core ideas of the Reactor network model

The reactor network model is a very important programming idea in network programming. This article attempts to explain the core idea of ​​the reactor network programming model through a brief example. Of course, the implementation of this article is not perfect yet. For example, fd is still passed in when calling the callback function. Can we completely separate it from IO without this parameter?

In the network programming series of articles, we implemented a network framework based on epoll, and developed a simple HTTP service on this basis. In that series of articles, we used two buffers to read and write to connect network IO and data. Reading and writing are separated, and the reversal between them is completely notified through the epoll event. If you study the source code carefully, you will find that all operations on network IO are triggered by events. This event-triggered network model is usually called the Reactor network model.

Since the code implementation in the network programming series of articles is relatively complex, it is not easy to explain clearly. Therefore, I decided to publish a few separate articles to expand on that series of articles, mainly involving network programming ideas and performance testing.

In this article, we illustrate the general idea of ​​implementing the Reactor network model by implementing a simple network framework. The essential idea is basically the same as the x-net project, but the code has been greatly simplified, making it easier to understand. Much easier.

First, let’s look at a piece of code

#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>




int main() {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);


    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(struct sockaddr_in));


    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(2048);


    if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {
        perror("bind fail");
        return -1;
    }


    listen(sockfd, 10);


    printf("sock-fd:%d\n", sockfd);


    int epfd = epoll_create(1);


    struct epoll_event ev;
    ev.events = EPOLLIN;
    ev.data.fd = sockfd;


    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);


    struct epoll_event events[1024] = {0};


    while(1) {
        int nready = epoll_wait(epfd, events, 1024, -1);


        int i = 0;
        for (i = 0; i < nready; i++) {
            int connfd = events[i].data.fd;
            if (events[i].events & EPOLLIN && sockfd == connfd) {
                struct sockaddr_in clientaddr;
                socklen_t len = sizeof(clientaddr);


                int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);


                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = clientfd;
                epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);


                printf("clientfd: %d\n", clientfd);
            } else if (events[i].events & EPOLLIN) {


                char buffer[10] = {0};


                int count = recv(connfd, buffer, 10, 0);
                if (count == 0) {
                    printf("discounnect\n");


                    epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL);
                    close(i);


                    continue;
                }


                send(connfd, buffer, count, 0);
                printf("clientfd: %d, count: %d, buffer: %s\n", connfd, count, buffer);
            }
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.

People who are familiar with epoll should be familiar with the above code. The core of this code is in the while main loop below. If it is the current Server's Socket, it means that there is a new connection coming in. Call accept to get the client's fd and put it in epoll. events, and register the EPOLLIN event, which we generally understand as a readable event.

If it is not sockfd, it means that the client's fd is readable. We will read the data and send it back as it is.

The main problem with the above code is that we write the socket accept and read and write operations directly in the main loop, which will make the logic of the code difficult to figure out.

For a socket, the most straightforward operations are reading and writing. Therefore, the easiest thing to think of is to separate reading and writing. In order to realize the separation of reading and writing, we encapsulate two callback functions, as follows:

int recv_callback(int fd, char *buffer, int size);
int send_callback(int fd, char *buffer, int size);
  • 1.
  • 2.

You can think about it, how should these two functions be written? The following is to encapsulate reading and writing in two functions, recv_callback and send_callback, based on the original logic. The code is as follows:

int recv_callback(int fd, char *buffer, int size) {
    int count = recv(fd, buffer, size, 0);


    send_callback(fd, buffer, count, 0);


    return count;
}
int send_callback(int fd, char *buffer, int size) {
    int count = send(fd, buffer, size, 0);


    return count;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.

Then, in the main loop, you can use it like this

int main() {


    ...


    while(1) {
        int nready = epoll_wait(epfd, events, 1024, -1);


        int i = 0;
        for (i = 0; i < nready; i++) {
            int connfd = events[i].data.fd;


            if (events[i].events & EPOLLIN && sockfd == connfd) {
                ...
            } else if (events[i].events & EPOLLIN) {
                char buffer[10] = {0};


                int count = recv_callback(fd, buffer, 10);
                if (count == 0) {
                    printf("disconnect\\n");
                    epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL);
                    clise(i);
                    continue;
                }
            }
        }
    } 
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.

Although we have split reading and writing into two methods, reading and writing are not separated. Every time we receive data in recv_callback, we call send_callback to send the data back as it is. Here we hope that recv_callback and send_callback will take care of each other. Each one does not interfere with each other, for example, as follows

int recv_callback(int fd, char *buffer, int size) {
    int count = recv(fd, buffer, size, 0);


    return count;
}
int send_callback(int fd, char *buffer, int size) {
    int count = send(fd, buffer, size, 0);


    return count;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

But this is obviously problematic. After reading in recv_callback, how to send the data? Here, we can think about it, what are the parts surrounding a socket? Is it possible to design a dictionary-like structure? The key of this dictionary corresponds to the socket, and the value corresponds to various components related to the socket.

We put recv_callback and send_callback in a conn_channel structure, and designed two buffers, one for reading data and the other for sending data. conn_channel is the value corresponding to this dictionary. The code is as follows:

#define BUF_LEN   1024


typedef int(*callback)(int fd);


struct conn_channel {
    int fd;


    callback recv_call;
    callback send_call;


    char wbuf[BUF_LEN];
    int wlen;
    char rbuf[BUF_LEN];
    int rlen;
};
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.

Among them, fd represents the current client socket. Then we define an array to represent the mapping relationship from socket to socket value. The code is as follows:

struct conn_channel conn_map[1024] = {0};
  • 1.

In this way, in the main loop, we can add the corresponding socket to conn_map as follows, the code is as follows:

int main() {
    ...


    while(1) {
        int nready = epoll_wait(epfd, events, 1024, -1);


        int i = 0;
        for (i = 0; i < nready; i++) {
            int connfd = events[i].data.fd;


            if (events[i].events & EPOLLIN && sockfd == connfd) {
                struct sockaddr_in clientaddr;
                socklen_t len = sizeof(clientaddr);


                int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);


                ev.events = EPOLLIN;
                ev.data.fd = clientaddr;


                epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);


                conn_map[clientfd].fd = clientfd;
                conn_map[clientfd].rlen = 0;
                conn_map[clientfd].wlen = 0;
                conn_map[clientfd].recv_call = recv_callback;
                conn_map[clientfd].send_call = send_callback;
                memset(conn_map[clientfd].rbuf, 0, BUF_LEN);
                memset(conn_map[clientfd].wbuf, 0, BUF_LEN);


                printf("clientfd:%d\\n", clientfd);
            } else if (events[i].events & EPOLLIN) {
                ...
            }
        }
    } 
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.

In the above code, whenever a client socket is accepted, we put it in conn_map and set up the read and write buffer and callback function. But if you are careful, you will find that the callback function signatures in recv_callback, send_callback and conn_channel are different. Therefore, we need to adjust the implementation of these two functions. After adjustment, the code is as follows:

int recv_callback(int fd) {
    int count = recv(fd, conn_map[fd].rbuf + conn_map[fd].rlen, BUF_LEN - conn_map[fd].rlen, 0);
    // do something


    memcpy(conn_map[fd].wbuf, conn_map[fd].rbuf, conn_map[fd].rlen);
    conn_map[fd].wlen = conn_map[fd].rlen;
    conn_map[fd].rlen = 0;


    return count;
}
int send_callback(int fd) {
    int count = send(fd, conn_map[fd].wbuffer, conn_map[fd].wlen, 0);


    return count;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.

Because of conn_map, the buffer and size passed in are no longer needed, and are already recorded in conn_channel. So only one fd parameter is needed. We simulated the reply message in recv_callback and forcibly wrote the read data into the wbuffer. Let me add here that the rbuffer in conn_channel is used to read data from the socket, and wbuffer represents the data to be sent to the socket.

You can try to run the above code, and then you will find that it does not execute as expected, and the send in send_callback does not seem to work. This is because we just wrote the data from rbuffer to wbuffer, and send_callback did not have a chance to be called. You can think about where is the appropriate place to call send_callback?

In the above example, it is obviously more appropriate to execute it in the main loop. In epoll, EPOLLOUT represents a writable event, and we can use this event. After recv_callback is executed, we register an EPOLLOUT event, and then in the main loop we listen for the EPOLLOUT event. In this way, after recv_callback copies the rbuffer data to wbuffer, send_callback can be executed in the main loop through the EPOLLOUT event.

In order to achieve the above effect, we need to modify two places. One is that we need to register the EPOLLOUT event in recv_callback. The code is as follows:

int recv_callback(int fd) {
    int count = recv(fd, conn_map[fd].rbuf + conn_map[fd].rlen, BUF_LEN - conn_map[fd].rlen, 0);
    // do something


    memcpy(conn_map[fd].wbuf, conn_map[fd].rbuf, conn_map[fd].rlen);
    conn_map[fd].wlen = conn_map[fd].rlen;
    conn_map[fd].rlen = 0;


    struct epoll_event ev;
    ev.events = EPOLLOUT;
    ev.data.fd = fd;


    epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);


    return count;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.

After copying rbuf to wbuf, we register the EPOLLOUT event for the current fd, and then we process the EPOLLOUT event in the main loop. The code is as follows:

int main() {
    ...


    while(1) {
        int nready = epoll_wait(epfd, events, 1024, -1);


        int i = 0;
        for (i = 0; i < nready; i++) {
            int connfd = events[i].data.fd;


            if (events[i].events & EPOLLIN && sockfd == connfd) {
                struct sockaddr_in clientaddr;
                socklen_t len = sizeof(clientaddr);


                int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);


                ev.events = EPOLLIN;
                ev.data.fd = clientaddr;


                epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);


                conn_map[clientfd].fd = clientfd;
                conn_map[clientfd].rlen = 0;
                conn_map[clientfd].wlen = 0;
                conn_map[clientfd].recv_call = recv_callback;
                conn_map[clientfd].send_call = send_callback;
                memset(conn_map[clientfd].rbuf, 0, BUF_LEN);
                memset(conn_map[clientfd].wbuf, 0, BUF_LEN);


                printf("clientfd:%d\\n", clientfd);
            } else if (events[i].events & EPOLLIN) {
                int count = conn_map[connfd].recv_call(connfd);
                printf("recv-count:%d\\n", count);
            } else if (events[i].events & EPOLLOUT) { // 处理EPOLLOUT事件
                int count  = conn_map[connfd].send_call(connfd);
                printf("send-count:%d\\n", count);
            }
        }
    } 
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.

It should be noted that epfd is defined in the main function, and we use it in recv_callback, so we can temporarily declare epfd as a global variable and put it outside.

There is a problem with the above code. After the EPOLLOUT event is triggered, you will find that if you send data to the current fd, there will be no response. This is because the epoll event has been modified by us. To solve this problem, we can set it back after send_callback is executed. ,as follows:

int send_callback(int fd) {
    int count = send(fd, conn_map[fd].wbuffer, conn_map[fd].wlen, 0);


    struct epoll_event ev;
    ev.events = EPOLLIN;
    ev.data.fd = fd;


    epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);


    return count;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.

In this way, we shield the IO operation. In the main loop, we only focus on events. Different events call different callback functions. Only do what you should do in the corresponding callback function, and after completing it, register the event to notify other callback functions.

However, the above code is not elegant enough. Accept and read events are both EPOLLIN events in epoll. Can these two be merged and processed together? The answer is yes. First, we need to disassemble the accept-related logic. The disassembled code is as follows:

int accept_callback(int fd) {
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);


    int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);


    ev.events = EPOLLIN;
    ev.data.fd = clientaddr;


    epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);


    conn_map[clientfd].fd = clientfd;
    conn_map[clientfd].rlen = 0;
    conn_map[clientfd].wlen = 0;
    conn_map[clientfd].recv_call = recv_callback;
    conn_map[clientfd].send_call = send_callback;
    memset(conn_map[clientfd].rbuf, 0, BUF_LEN);
    memset(conn_map[clientfd].wbuf, 0, BUF_LEN);


    return clientfd;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.

We found that the signatures of accept_callback, recv_callback and send_callback are the same, so we can use a union in conn_channel and put accept_callback in conn_channel. as follows:

struct conn_channel {
    int fd;


    union {
        callback accept_call;
        callback recv_call;
    } call_t;
    callback send_call;


    char wbuf[BUF_LEN];
    int wlen;
    char rbuf[BUF_LEN];
    int rlen;
};
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.

In the main loop, we can first register the accept callback function for sockfd, and then we only need to keep two logics in the main loop. The code is as follows:

int main() {
    int sockfd = create_serv(9000);
    if (sockfd == -1) {
        perror("create-server-fail");
        return -1;
    }


    make_nonblocking(sockfd);


    epfd = epoll_create1(1);


    struct epoll_event ev;
    ev.events = EPOLLIN;
    ev.data.fd = sockfd;


    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);


    struct epoll_event events[1024] = {0}; 


    conn_map[sockfd].rlen = 0;
    conn_map[sockfd].wlen = 0;
    conn_map[sockfd].fd = sockfd;
    conn_map[sockfd].call_t.accept_call = accept_callback;
    conn_map[sodkfd].send_call = send_callback;
    memset(conn_map[sockfd].rbuf, 0, BUF_LEN);
    memset(conn_map[sockfd].wbuf, 0, BUF_LEN);


    while(1) {
        int nready = epoll_wait(epfd, events, 1024, -1);


        int i = 0;
        for (i = 0; i < nready; i++) {
            int connfd = events[i].data.fd;
            if (events[i].events & EPOLLIN) {
                int count = conn_map[connfd].call_t.recv_call(connfd);
                printf("recv-count:%d\\n", count);
            } else if (events[i].events & EPOLLOUT) {
                int count  = conn_map[connfd].send_call(connfd);
                printf("send-count:%d\\n", count);
            }
        }
    } 
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.

You can think about it, what we registered is call_t.accept_call, but when we call it, it is call_t.recv_call. Why is this possible?

In our series of articles on network programming, we abstracted an object separately for accept. You can compare these two implementation methods and see what are the differences between them? Why do we abstract an accepter object separately in the series of files?

It can be seen that the logic in the final main loop has only two branches. These two branches represent two events. This event-driven network model is the Reactor network model. This article has simplified the code for easy understanding. In actual projects we have to consider many situations. For example, the above code only supports epoll. Can we abstract event-driven code into separate components so that it can support other event models?

Although the code in this article is simple, the implementation of the Reactor network model basically cannot escape this routine. However, on this basis, each part may be encapsulated separately. For example, we abstracted channel and map in the network programming series of articles. , allowing it to adapt to various scenarios.

Summarize

The reactor network model is a very important programming idea in network programming. This article attempts to explain the core idea of ​​the reactor network programming model through a brief example. Of course, the implementation of this article is not perfect yet. For example, fd is still passed in when calling the callback function. Can we completely separate it from IO without this parameter?