全I / O裸C反应器


引言


I / O反应器 (单线程事件循环 )是一种模式,用于编写在许多流行的解决方案中使用的高负载软件:



在本文中,我们将考虑I / O反应器的来龙去脉及其操作原理,编写少于200行代码的实现,并强制一个简单的HTTP服务器每分钟处理超过4000万个请求。


前言


  • 撰写本文的目的是帮助理解I / O反应器的功能,从而认识使用I / O反应器的风险。
  • 要掌握本文,您需要了解C语言的基础知识并且需要很少的开发网络应用程序的经验。
  • 所有代码均由C(严格地为长PDF )严格地用C编写的Linux C11标准 ,并且可以在GitHub上获得

为什么需要这个?


随着Internet的日益普及,Web服务器需要同时处理大量连接,因此尝试了两种方法:阻止大量OS线程上的I / O和与事件通知系统(也称为“系统”)组合使用非阻止I / O。选择器”( epoll / kqueue / IOCP等)。


第一种方法涉及为每个传入连接创建一个新的OS线程。 它的缺点是可伸缩性差:操作系统将不得不进行许多上下文转换系统调用 。 它们是昂贵的操作,并且会导致大量连接导致缺少可用RAM。


修改后的版本分配了固定数量的线程 (线程池),从而防止了系统异常停止执行,但是同时引入了一个新的问题:如果在给定的时刻线程池被长时间的读取操作阻塞,那么其他已经能够接收数据的套接字将无法执行此操作。


第二种方法使用操作系统提供的事件通知系统 (系统选择器)。 本文根据有关I / O操作准备就绪的警报(事件,通知),而不是有关其完成情况的警报 ,讨论最常见的系统选择器类型。 下面的流程图可以表示其用法的简化示例:



这些方法之间的区别如下:


  • 阻塞的I / O操作会挂起用户流, 直到操作系统进入的IP数据包正确整理到字节流( TCP ,接收数据)或释放内部写缓冲区中的足够空间,以便随后通过NIC发送(发送数据)为止。
  • 稍后,系统选择器会通知程序OS 已对 IP数据包进行碎片整理(TCP,正在接收数据)或内部记录缓冲区中的足够空间(发送数据)。

总而言之,为每个I / O保留OS线程是在浪费计算能力,因为实际上,线程并不忙于有用的工作(术语“软件中断”植根于其中 )。 系统选择器通过允许用户程序更经济地消耗CPU资源来解决此问题。


反应堆I / O型号


I / O反应器充当系统选择器和用户代码之间的一层。 下面的流程图描述了其操作原理:



  • 让我提醒您,事件是对某个套接字能够执行非阻塞I / O操作的通知。
  • 事件处理程序是接收到事件后由I / O反应器调用的函数,然后该函数执行无阻塞的I / O操作。

重要的是要注意,根据定义,I / O反应器是单线程的,但是没有什么可以阻止在多线程环境中针对1个流:1个反应器使用此概念,从而利用了所有CPU内核。


实作


我们将公共接口放在reactor.h文件中,并将实现放在反应器reactor.creactor.h将包含以下声明:


在reactor.h中显示广告
 typedef struct reactor Reactor; /* *   ,    I/O    *    . */ typedef void (*Callback)(void *arg, int fd, uint32_t events); /* *  `NULL`   , -`NULL`   `Reactor`  *  . */ Reactor *reactor_new(void); /* *   ,       *    I/O . * *    -1   , 0   . */ int reactor_destroy(Reactor *reactor); int reactor_register(const Reactor *reactor, int fd, uint32_t interest, Callback callback, void *callback_arg); int reactor_deregister(const Reactor *reactor, int fd); int reactor_reregister(const Reactor *reactor, int fd, uint32_t interest, Callback callback, void *callback_arg); /* *     - `timeout`. * *           * /    . */ int reactor_run(const Reactor *reactor, time_t timeout); 

反应器的I / O结构由epoll选择器文件描述符GHashTable 哈希表组成 ,每个套接字都将其映射到CallbackData (事件处理程序的结构和用户参数)。


显示Reactor和CallbackData
 struct reactor { int epoll_fd; GHashTable *table; // (int, CallbackData) }; typedef struct { Callback callback; void *arg; } CallbackData; 

请注意,我们使用了通过指针处理不完整类型的功能。 在reactor.h我们声明了reactor的结构,在reactor.c定义了它的结构,从而防止了用户显式更改其字段。 这是有机地适合C语义的数据隐藏模式之一。


reactor_registerreactor_deregisterreactor_reregister更新目标选择器的列表以及系统选择器和哈希表中的相应事件处理程序。


显示注册功能
 #define REACTOR_CTL(reactor, op, fd, interest) \ if (epoll_ctl(reactor->epoll_fd, op, fd, \ &(struct epoll_event){.events = interest, \ .data = {.fd = fd}}) == -1) { \ perror("epoll_ctl"); \ return -1; \ } int reactor_register(const Reactor *reactor, int fd, uint32_t interest, Callback callback, void *callback_arg) { REACTOR_CTL(reactor, EPOLL_CTL_ADD, fd, interest) g_hash_table_insert(reactor->table, int_in_heap(fd), callback_data_new(callback, callback_arg)); return 0; } int reactor_deregister(const Reactor *reactor, int fd) { REACTOR_CTL(reactor, EPOLL_CTL_DEL, fd, 0) g_hash_table_remove(reactor->table, &fd); return 0; } int reactor_reregister(const Reactor *reactor, int fd, uint32_t interest, Callback callback, void *callback_arg) { REACTOR_CTL(reactor, EPOLL_CTL_MOD, fd, interest) g_hash_table_insert(reactor->table, int_in_heap(fd), callback_data_new(callback, callback_arg)); return 0; } 

I / O反应器使用fd描述符拦截事件后,将调用相应的事件处理程序,将fd ,生成的事件的位掩码以及指向void的用户指针传递到该事件处理程序中。


显示reactor_run()函数
 int reactor_run(const Reactor *reactor, time_t timeout) { int result; struct epoll_event *events; if ((events = calloc(MAX_EVENTS, sizeof(*events))) == NULL) abort(); time_t start = time(NULL); while (true) { time_t passed = time(NULL) - start; int nfds = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, timeout - passed); switch (nfds) { //  case -1: perror("epoll_wait"); result = -1; goto cleanup; //   case 0: result = 0; goto cleanup; //   default: //    for (int i = 0; i < nfds; i++) { int fd = events[i].data.fd; CallbackData *callback = g_hash_table_lookup(reactor->table, &fd); callback->callback(callback->arg, fd, events[i].events); } } } cleanup: free(events); return result; } 

总而言之,用户代码中的函数调用链将采用以下形式:



单线程服务器


为了测试高负载下的I / O反应器,我们将编写一个简单的HTTP Web服务器来响应带有图像的任何请求。


HTTP协议快速参考

HTTP是一种应用程序级协议,主要用于服务器与浏览器的交互。


HTTP可以很容易地在TCP 传输协议之上使用,发送和接收规范定义的格式的消息。


要求格式


 <> <URI> < HTTP>CRLF < 1>CRLF < 2>CRLF < N>CRLF CRLF <> 

  • CRLF是两个字符的序列: \r\n ,分隔查询的第一行,标头和数据。
  • <>CONNECTDELETEGETHEADOPTIONSPATCHPOSTPUTTRACE 。 浏览器将向我们的服务器发送GET命令,意思是“向我发送文件内容”。
  • <URI>统一资源标识符 。 例如,如果URI = /index.html ,则客户端将请求网站的主页。
  • < HTTP> -HTTP协议版本, HTTP/XY格式。 迄今为止,最常用的版本是HTTP/1.1
  • < N>是格式为<>: <>的键值对,已发送到服务器以进行进一步分析。
  • <> -服务器完成操作所需的数据。 通常它只是JSON或任何其他格式。

回应格式


 < HTTP> < > < >CRLF < 1>CRLF < 2>CRLF < N>CRLF CRLF <> 

  • < >是表示操作结果的数字。 我们的服务器将始终返回状态200(成功操作)。
  • < > -状态代码的字符串表示形式。 对于状态码200,这OK
  • < N> -与请求中的格式相同的标头。 我们将返回Content-Length (文件大小)和Content-Type: text/html (返回类型数据)标题。
  • <> -用户请求的数据。 在我们的例子中,这是HTML中图像的路径。

http_server.c (单线程服务器)文件包括common.h文件,该文件包含以下函数原型:


显示common.h中的函数原型
 /* *  ,    ,    *    . */ static void on_accept(void *arg, int fd, uint32_t events); /* *  ,    ,    *   HTTP . */ static void on_send(void *arg, int fd, uint32_t events); /* *  ,    ,    *    HTTP . */ static void on_recv(void *arg, int fd, uint32_t events); /* *      . */ static void set_nonblocking(int fd); /* *     stderr      *  `EXIT_FAILURE`. */ static noreturn void fail(const char *format, ...); /* *    ,    * TCP . */ static int new_server(bool reuse_port); 

还描述了函数宏SAFE_CALL()并定义了fail()函数。 宏将表达式的值与错误进行比较,如果满足条件,它将调用fail()函数:


 #define SAFE_CALL(call, error) \ do { \ if ((call) == error) { \ fail("%s", #call); \ } \ } while (false) 

fail()函数将传递的参数打印到终端(例如printf() ),并使用EXIT_FAILURE代码终止程序:


 static noreturn void fail(const char *format, ...) { va_list args; va_start(args, format); vfprintf(stderr, format, args); va_end(args); fprintf(stderr, ": %s\n", strerror(errno)); exit(EXIT_FAILURE); } 

new_server()函数返回系统调用socket()bind()listen()创建的“服务器”套接字的文件描述符,并能够以非阻塞模式接受传入的连接。


显示函数new_server()
 static int new_server(bool reuse_port) { int fd; SAFE_CALL((fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP)), -1); if (reuse_port) { SAFE_CALL( setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &(int){1}, sizeof(int)), -1); } struct sockaddr_in addr = {.sin_family = AF_INET, .sin_port = htons(SERVER_PORT), .sin_addr = {.s_addr = inet_addr(SERVER_IPV4)}, .sin_zero = {0}}; SAFE_CALL(bind(fd, (struct sockaddr *)&addr, sizeof(addr)), -1); SAFE_CALL(listen(fd, SERVER_BACKLOG), -1); return fd; } 

  • 请注意,套接字最初是使用SOCK_NONBLOCK标志以非阻塞模式创建的,因此在on_accept()函数(了解更多)中, accept()系统调用不会停止流的执行。
  • 如果reuse_porttrue ,则此函数将使用setsockopt()使用SO_REUSEPORT选项配置套接字,以在多线程环境中使用相同的端口(请参阅“多线程服务器”一节)。

在操作系统生成EPOLLIN事件之后,将调用on_accept()事件处理程序,在这种情况下,这意味着可以接受新的连接。 on_accept()接受一个新的连接,将其切换为非阻塞模式,并在I / O反应器中向on_recv()事件处理程序注册。


显示on_accept()函数
 static void on_accept(void *arg, int fd, uint32_t events) { int incoming_conn; SAFE_CALL((incoming_conn = accept(fd, NULL, NULL)), -1); set_nonblocking(incoming_conn); SAFE_CALL(reactor_register(reactor, incoming_conn, EPOLLIN, on_recv, request_buffer_new()), -1); } 

在操作系统生成EPOLLIN事件之后,将调用on_recv()事件处理程序,在这种情况下,这意味着注册的on_accept()连接已准备好接收数据。


on_recv()从连接中读取数据,直到收到完整的HTTP请求为止,然后它注册on_send()处理函数以发送HTTP响应。 如果客户端断开连接,套接字将注销并使用close()关闭。


显示on_recv()函数
 static void on_recv(void *arg, int fd, uint32_t events) { RequestBuffer *buffer = arg; //      ,  recv  0   ssize_t nread; while ((nread = recv(fd, buffer->data + buffer->size, REQUEST_BUFFER_CAPACITY - buffer->size, 0)) > 0) buffer->size += nread; //    if (nread == 0) { SAFE_CALL(reactor_deregister(reactor, fd), -1); SAFE_CALL(close(fd), -1); request_buffer_destroy(buffer); return; } // read  ,   ,     //  if (errno != EAGAIN && errno != EWOULDBLOCK) { request_buffer_destroy(buffer); fail("read"); } //   HTTP   .    //     if (request_buffer_is_complete(buffer)) { request_buffer_clear(buffer); SAFE_CALL(reactor_reregister(reactor, fd, EPOLLOUT, on_send, buffer), -1); } } 

在操作系统生成EPOLLOUT事件之后,将调用on_send()事件处理程序,这意味着由on_recv()注册的连接已准备好发送数据。 此函数将包含HTML和图像的HTTP响应发送到客户端,然后将事件处理程序再次更改为on_recv()


显示on_send()函数
 static void on_send(void *arg, int fd, uint32_t events) { const char *content = "<img " "src=\"https://habrastorage.org/webt/oh/wl/23/" "ohwl23va3b-dioerobq_mbx4xaw.jpeg\">"; char response[1024]; sprintf(response, "HTTP/1.1 200 OK" CRLF "Content-Length: %zd" CRLF "Content-Type: " "text/html" DOUBLE_CRLF "%s", strlen(content), content); SAFE_CALL(send(fd, response, strlen(response), 0), -1); SAFE_CALL(reactor_reregister(reactor, fd, EPOLLIN, on_recv, arg), -1); } 

最后,在文件http_server.cmain()函数中,我们使用reactor_new()创建一个I / O反应器,创建一个服务器套接字并注册它,并使用reactor_run()启动反应器一分钟,然后释放资源并退出从程序。


显示http_server.c
 #include "reactor.h" static Reactor *reactor; #include "common.h" int main(void) { SAFE_CALL((reactor = reactor_new()), NULL); SAFE_CALL( reactor_register(reactor, new_server(false), EPOLLIN, on_accept, NULL), -1); SAFE_CALL(reactor_run(reactor, SERVER_TIMEOUT_MILLIS), -1); SAFE_CALL(reactor_destroy(reactor), -1); } 

检查一切是否按预期进行。 我们进行编译(在项目的根目录中为chmod a+x compile.sh && ./compile.sh )并启动自写服务器,在浏览器中打开http://127.0.0.1:18470并观察预期的结果:



绩效评估


展示我的车的特点
 $ screenfetch MMMMMMMMMMMMMMMMMMMMMMMMMmds+. OS: Mint 19.1 tessa MMm----::-://////////////oymNMd+` Kernel: x86_64 Linux 4.15.0-20-generic MMd /++ -sNMd: Uptime: 2h 34m MMNso/` dMM `.::-. .-::.` .hMN: Packages: 2217 ddddMMh dMM :hNMNMNhNMNMNh: `NMm Shell: bash 4.4.20 NMm dMM .NMN/-+MMM+-/NMN` dMM Resolution: 1920x1080 NMm dMM -MMm `MMM dMM. dMM DE: Cinnamon 4.0.10 NMm dMM -MMm `MMM dMM. dMM WM: Muffin NMm dMM .mmd `mmm yMM. dMM WM Theme: Mint-Y-Dark (Mint-Y) NMm dMM` ..` ... ydm. dMM GTK Theme: Mint-Y [GTK2/3] hMM- +MMd/-------...-:sdds dMM Icon Theme: Mint-Y -NMm- :hNMNNNmdddddddddy/` dMM Font: Noto Sans 9 -dMNs-``-::::-------.`` dMM CPU: Intel Core i7-6700 @ 8x 4GHz [52.0°C] `/dMNmy+/:-------------:/yMMM GPU: NV136 ./ydNMMMMMMMMMMMMMMMMMMMMM RAM: 2544MiB / 7926MiB \.MMMMMMMMMMMMMMMMMMM 

我们评估单线程服务器的性能。 让我们打开两个终端:在其中一个中,我们运行./http_server ,在另一个中./http_server 。 一分钟后,第二个终端将显示以下统计信息:


 $ wrk -c100 -d1m -t8 http://127.0.0.1:18470 -H "Host: 127.0.0.1:18470" -H "Accept-Language: en-US,en;q=0.5" -H "Connection: keep-alive" Running 1m test @ http://127.0.0.1:18470 8 threads and 100 connections Thread Stats Avg Stdev Max +/- Stdev Latency 493.52us 76.70us 17.31ms 89.57% Req/Sec 24.37k 1.81k 29.34k 68.13% 11657769 requests in 1.00m, 1.60GB read Requests/sec: 193974.70 Transfer/sec: 27.19MB 

我们的单线程服务器每分钟能够处理100万个连接中的1100万个请求。 结果不错,但是可以改善吗?


多线程服务器


如上所述,可以在单独的流中创建I / O反应器,从而利用所有CPU内核。 让我们在实践中应用这种方法:


显示http_server_multithreaded.c
 #include "reactor.h" static Reactor *reactor; #pragma omp threadprivate(reactor) #include "common.h" int main(void) { #pragma omp parallel { SAFE_CALL((reactor = reactor_new()), NULL); SAFE_CALL(reactor_register(reactor, new_server(true), EPOLLIN, on_accept, NULL), -1); SAFE_CALL(reactor_run(reactor, SERVER_TIMEOUT_MILLIS), -1); SAFE_CALL(reactor_destroy(reactor), -1); } } 

现在,每个线程都拥有自己的反应堆:


 static Reactor *reactor; #pragma omp threadprivate(reactor) 

请注意, new_server()的参数为true 。 这意味着我们将服务器套接字设置为SO_REUSEPORT选项,以在多线程环境中使用它。 您可以在这里阅读更多内容。


第二次


现在,我们将评估多线程服务器的性能:


 $ wrk -c100 -d1m -t8 http://127.0.0.1:18470 -H "Host: 127.0.0.1:18470" -H "Accept-Language: en-US,en;q=0.5" -H "Connection: keep-alive" Running 1m test @ http://127.0.0.1:18470 8 threads and 100 connections Thread Stats Avg Stdev Max +/- Stdev Latency 1.14ms 2.53ms 40.73ms 89.98% Req/Sec 79.98k 18.07k 154.64k 78.65% 38208400 requests in 1.00m, 5.23GB read Requests/sec: 635876.41 Transfer/sec: 89.14MB 

1分钟内处理的请求数量增加了约3.28倍! 但是到整数为止,仅200万是不够的,让我们尝试对其进行修复。


首先,查看perf生成的统计信息:


 $ sudo perf stat -B -e task-clock,context-switches,cpu-migrations,page-faults,cycles,instructions,branches,branch-misses,cache-misses ./http_server_multithreaded Performance counter stats for './http_server_multithreaded': 242446,314933 task-clock (msec) # 4,000 CPUs utilized 1 813 074 context-switches # 0,007 M/sec 4 689 cpu-migrations # 0,019 K/sec 254 page-faults # 0,001 K/sec 895 324 830 170 cycles # 3,693 GHz 621 378 066 808 instructions # 0,69 insn per cycle 119 926 709 370 branches # 494,653 M/sec 3 227 095 669 branch-misses # 2,69% of all branches 808 664 cache-misses 60,604330670 seconds time elapsed 

使用CPU亲和力 ,使用-march=nativePGO进行编译,增加高速缓存中的命中次数,增加MAX_EVENTS以及使用EPOLLET不会显着提高性能。 但是,如果增加同时连接的数量会怎样?


352个同时连接的统计信息:


 $ wrk -c352 -d1m -t8 http://127.0.0.1:18470 -H "Host: 127.0.0.1:18470" -H "Accept-Language: en-US,en;q=0.5" -H "Connection: keep-alive" Running 1m test @ http://127.0.0.1:18470 8 threads and 352 connections Thread Stats Avg Stdev Max +/- Stdev Latency 2.12ms 3.79ms 68.23ms 87.49% Req/Sec 83.78k 12.69k 169.81k 83.59% 40006142 requests in 1.00m, 5.48GB read Requests/sec: 665789.26 Transfer/sec: 93.34MB 

获得了期望的结果,它带有一个有趣的图表,显示了1分钟内处理的请求数与连接数的关系:



我们看到,经过数百次连接后,两台服务器处理的请求数量都急剧下降(在多线程版本中,这一点更加明显)。 这与Linux TCP / IP堆栈实现有关吗? 请在注释中随意编写关于这种图形行为以及多线程和单线程选项优化的假设。




如评论中所述,此性能测试未显示实际负载下I / O反应器的行为,因为几乎总是服务器与数据库交互,显示日志,使用TLS加密等,因此负载变得异构(动态)。 有关第三方组件的测试将在有关I / O proactor的文章中进行。


I / O Reactor的缺点


您需要了解,I / O反应器并非没有缺点,即:


  • 在多线程环境中使用I / O反应器要困难一些,因为 您必须手动管理流程。
  • 实践表明,在大多数情况下,负载是异构的,这可能导致以下事实:一个线程将被放下,而另一个线程将被加载。
  • 如果一个事件处理程序阻止了流,则系统选择器本身也将被阻止,这可能导致难以捕获的错误。

这些问题由I / O代理解决,通常使用调度程序将负载平均分配给线程池,并且还具有更方便的API。 稍后将在我的另一篇文章中讨论。


结论


至此,我们从理论直接进入排气分析仪的旅程结束了。


不要赘述,因为还有许多其他同样有趣的方法来编写具有不同级别的便利性和速度的网络软件。 我认为有趣的是,下面给出了链接。


待会见!


有趣的项目



还有什么要读的?



Source: https://habr.com/ru/post/zh-CN475896/


All Articles