|
| 1 | +# Redis 和 I/O 多路复用 |
| 2 | + |
| 3 | +最近在看 UNIX 网络编程并研究了一下 Redis 的实现,感觉 Redis 的源代码十分适合阅读和分析,其中 I/O 多路复用(mutiplexing)部分的实现非常干净和优雅,在这里想对这部分的内容进行简单的整理。 |
| 4 | + |
| 5 | +## 几种 I/O 模型 |
| 6 | + |
| 7 | +为什么 Redis 中要使用 I/O 多路复用这种技术呢? |
| 8 | + |
| 9 | +首先,Redis 是跑在单线程中的,所有的操作都是按照顺序线性执行的,但是由于读写操作等待用户输入或输出都是阻塞的,所以 I/O 操作在一般情况下往往不能直接返回,这会导致某一文件的 I/O 阻塞导致整个进程无法对其它客户提供服务,而 **I/O 多路复用**就是为了解决这个问题而出现的。 |
| 10 | + |
| 11 | +### Blocking I/O |
| 12 | + |
| 13 | +先来看一下传统的阻塞 I/O 模型到底是如何工作的:当使用 `read` 或者 `write` 对某一个**文件描述符(File Descriptor 以下简称 FD)**进行读写时,如果当前 FD 不可读或不可写,整个 Redis 服务就不会对其它的操作作出响应,导致整个服务不可用。 |
| 14 | + |
| 15 | +这也就是传统意义上的,也就是我们在编程中使用最多的阻塞模型: |
| 16 | + |
| 17 | + |
| 18 | + |
| 19 | +阻塞模型虽然开发中非常常见也非常易于理解,但是由于它会影响其他 FD 对应的服务,所以在需要处理多个客户端任务的时候,往往都不会使用阻塞模型。 |
| 20 | + |
| 21 | +### I/O 多路复用 |
| 22 | + |
| 23 | +> 虽然还有很多其它的 I/O 模型,但是在这里都不会具体介绍。 |
| 24 | +
|
| 25 | +阻塞式的 I/O 模型并不能满足这里的需求,我们需要一种效率更高的 I/O 模型来支撑 Redis 的多个客户(redis-cli),这里涉及的就是 I/O 多路复用模型了: |
| 26 | + |
| 27 | + |
| 28 | + |
| 29 | +在 I/O 多路复用模型中,最重要的函数调用就是 `select`,该方法的能够同时监控多个文件描述符的可读可写情况,当其中的某些文件描述符可读或者可写时,`select` 方法就会返回可读以及可写的文件描述符个数。 |
| 30 | + |
| 31 | +> 关于 `select` 的具体使用方法,在网络上资料很多,这里就不过多展开介绍了; |
| 32 | +> |
| 33 | +> 与此同时也有其它的 I/O 多路复用函数 `epoll/kqueue/evport`,它们相比 `select` 性能更优秀,同时也能支撑更多的服务。 |
| 34 | +
|
| 35 | +## Reactor 设计模式 |
| 36 | + |
| 37 | +Redis 服务采用 Reactor 的方式来实现文件事件处理器(每一个网络连接其实都对应一个文件描述符) |
| 38 | + |
| 39 | + |
| 40 | + |
| 41 | +文件事件处理器使用 I/O 多路复用模块同时监听多个 FD,当 `accept`、`read`、`write` 和 `close` 文件事件产生时,文件事件处理器就会回调 FD 绑定的事件处理器。 |
| 42 | + |
| 43 | +虽然整个文件事件处理器是在单线程上运行的,但是通过 I/O 多路复用模块的引入,实现了同时对多个 FD 读写的监控,提高了网络通信模型的性能,同时也可以保证整个 Redis 服务实现的简单。 |
| 44 | + |
| 45 | +## I/O 多路复用模块 |
| 46 | + |
| 47 | +I/O 多路复用模块封装了底层的 `select`、`epoll`、`avport` 以及 `kqueue` 这些 I/O 多路复用函数,为上层提供了相同的接口。 |
| 48 | + |
| 49 | + |
| 50 | + |
| 51 | +在这里我们简单介绍 Redis 是如何包装 `select` 和 `epoll` 的,简要了解该模块的功能,整个 I/O 多路复用模块抹平了不同平台上 I/O 多路复用函数的差异性,提供了相同的接口: |
| 52 | + |
| 53 | ++ `static int aeApiCreate(aeEventLoop *eventLoop)` |
| 54 | ++ `static int aeApiResize(aeEventLoop *eventLoop, int setsize)` |
| 55 | ++ `static void aeApiFree(aeEventLoop *eventLoop)` |
| 56 | ++ `static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)` |
| 57 | ++ `static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) ` |
| 58 | ++ `static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)` |
| 59 | + |
| 60 | +同时,因为各个函数所需要的参数不同,我们在每一个子模块内部通过一个 `aeApiState` 来存储需要的上下文信息: |
| 61 | + |
| 62 | +```c |
| 63 | +// select |
| 64 | +typedef struct aeApiState { |
| 65 | + fd_set rfds, wfds; |
| 66 | + fd_set _rfds, _wfds; |
| 67 | +} aeApiState; |
| 68 | + |
| 69 | +// epoll |
| 70 | +typedef struct aeApiState { |
| 71 | + int epfd; |
| 72 | + struct epoll_event *events; |
| 73 | +} aeApiState; |
| 74 | +``` |
| 75 | + |
| 76 | +这些上下文信息会存储在 `eventLoop` 的 `void *state` 中,不会暴露到上层,只在当前子模块中使用。 |
| 77 | + |
| 78 | +### 封装 select 函数 |
| 79 | + |
| 80 | +> `select` 可以监控 FD 的可读、可写以及出现错误的情况。 |
| 81 | +
|
| 82 | +在介绍 I/O 多路复用模块如何对 `select` 函数封装之前,先来看一下 `select` 函数使用的大致流程: |
| 83 | + |
| 84 | +```c |
| 85 | +int fd = /* file descriptor */ |
| 86 | + |
| 87 | +fd_set rfds; |
| 88 | +FD_ZERO(&rfds); |
| 89 | +FD_SET(fd, &rfds) |
| 90 | + |
| 91 | +for ( ; ; ) { |
| 92 | + select(fd+1, &rfds, NULL, NULL, NULL); |
| 93 | + if (FD_ISSET(fd, &rfds)) { |
| 94 | + /* file descriptor `fd` becomes readable */ |
| 95 | + } |
| 96 | +} |
| 97 | +``` |
| 98 | +
|
| 99 | +1. 初始化一个可读的 `fd_set` 集合,保存需要监控可读性的 FD; |
| 100 | +2. 使用 `FD_SET` 将 `fd` 加入 `rfds`; |
| 101 | +3. 调用 `select` 方法监控 `rfds` 中的 FD 是否可读; |
| 102 | +4. 当 `select` 返回时,检查 FD 的状态并完成对应的操作。 |
| 103 | +
|
| 104 | +而在 Redis 的 `ae_select` 文件中代码的组织顺序也是差不多的,首先在 `aeApiCreate` 函数中初始化 `rfds` 和 `wfds`: |
| 105 | +
|
| 106 | +```c |
| 107 | +static int aeApiCreate(aeEventLoop *eventLoop) { |
| 108 | + aeApiState *state = zmalloc(sizeof(aeApiState)); |
| 109 | + if (!state) return -1; |
| 110 | + FD_ZERO(&state->rfds); |
| 111 | + FD_ZERO(&state->wfds); |
| 112 | + eventLoop->apidata = state; |
| 113 | + return 0; |
| 114 | +} |
| 115 | +``` |
| 116 | + |
| 117 | +而 `aeApiAddEvent` 和 `aeApiDelEvent` 会通过 `FD_SET` 和 `FD_CLR` 修改 `fd_set` 中对应 FD 的标志位: |
| 118 | + |
| 119 | +```c |
| 120 | +static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { |
| 121 | + aeApiState *state = eventLoop->apidata; |
| 122 | + if (mask & AE_READABLE) FD_SET(fd,&state->rfds); |
| 123 | + if (mask & AE_WRITABLE) FD_SET(fd,&state->wfds); |
| 124 | + return 0; |
| 125 | +} |
| 126 | +``` |
| 127 | +
|
| 128 | +整个 `ae_select` 子模块中最重要的函数就是 `aeApiPoll`,它是实际调用 `select` 函数的部分,其作用就是在 I/O 多路复用函数返回时,将对应的 FD 加入 `aeEventLoop` 的 `fired` 数组中,并返回事件的个数: |
| 129 | +
|
| 130 | +```c |
| 131 | +static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { |
| 132 | + aeApiState *state = eventLoop->apidata; |
| 133 | + int retval, j, numevents = 0; |
| 134 | +
|
| 135 | + memcpy(&state->_rfds,&state->rfds,sizeof(fd_set)); |
| 136 | + memcpy(&state->_wfds,&state->wfds,sizeof(fd_set)); |
| 137 | +
|
| 138 | + retval = select(eventLoop->maxfd+1, |
| 139 | + &state->_rfds,&state->_wfds,NULL,tvp); |
| 140 | + if (retval > 0) { |
| 141 | + for (j = 0; j <= eventLoop->maxfd; j++) { |
| 142 | + int mask = 0; |
| 143 | + aeFileEvent *fe = &eventLoop->events[j]; |
| 144 | +
|
| 145 | + if (fe->mask == AE_NONE) continue; |
| 146 | + if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds)) |
| 147 | + mask |= AE_READABLE; |
| 148 | + if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds)) |
| 149 | + mask |= AE_WRITABLE; |
| 150 | + eventLoop->fired[numevents].fd = j; |
| 151 | + eventLoop->fired[numevents].mask = mask; |
| 152 | + numevents++; |
| 153 | + } |
| 154 | + } |
| 155 | + return numevents; |
| 156 | +} |
| 157 | +``` |
| 158 | + |
| 159 | +### 封装 epoll 函数 |
| 160 | + |
| 161 | +Redis 对 `epoll` 的封装其实也是类似的,使用 `epoll_create` 创建 `epoll` 中使用的 `epfd`: |
| 162 | + |
| 163 | +```c |
| 164 | +static int aeApiCreate(aeEventLoop *eventLoop) { |
| 165 | + aeApiState *state = zmalloc(sizeof(aeApiState)); |
| 166 | + |
| 167 | + if (!state) return -1; |
| 168 | + state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); |
| 169 | + if (!state->events) { |
| 170 | + zfree(state); |
| 171 | + return -1; |
| 172 | + } |
| 173 | + state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ |
| 174 | + if (state->epfd == -1) { |
| 175 | + zfree(state->events); |
| 176 | + zfree(state); |
| 177 | + return -1; |
| 178 | + } |
| 179 | + eventLoop->apidata = state; |
| 180 | + return 0; |
| 181 | +} |
| 182 | +``` |
| 183 | +
|
| 184 | +在 `aeApiAddEvent` 中使用 `epoll_ctl` 向 `epfd` 中添加需要监控的 FD 以及监听的事件: |
| 185 | +
|
| 186 | +```c |
| 187 | +static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { |
| 188 | + aeApiState *state = eventLoop->apidata; |
| 189 | + struct epoll_event ee = {0}; /* avoid valgrind warning */ |
| 190 | + /* If the fd was already monitored for some event, we need a MOD |
| 191 | + * operation. Otherwise we need an ADD operation. */ |
| 192 | + int op = eventLoop->events[fd].mask == AE_NONE ? |
| 193 | + EPOLL_CTL_ADD : EPOLL_CTL_MOD; |
| 194 | +
|
| 195 | + ee.events = 0; |
| 196 | + mask |= eventLoop->events[fd].mask; /* Merge old events */ |
| 197 | + if (mask & AE_READABLE) ee.events |= EPOLLIN; |
| 198 | + if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; |
| 199 | + ee.data.fd = fd; |
| 200 | + if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; |
| 201 | + return 0; |
| 202 | +} |
| 203 | +``` |
| 204 | + |
| 205 | +由于 `epoll` 相比 `select` 机制略有不同,在 `epoll_wait` 函数返回时并不需要遍历所有的 FD 查看读写情况;在 `epoll_wait` 函数返回时会提供一个 `epoll_event` 数组: |
| 206 | + |
| 207 | +```c |
| 208 | +typedef union epoll_data { |
| 209 | + void *ptr; |
| 210 | + int fd; /* 文件描述符 */ |
| 211 | + uint32_t u32; |
| 212 | + uint64_t u64; |
| 213 | +} epoll_data_t; |
| 214 | + |
| 215 | +struct epoll_event { |
| 216 | + uint32_t events; /* Epoll 事件 */ |
| 217 | + epoll_data_t data; |
| 218 | +}; |
| 219 | +``` |
| 220 | + |
| 221 | +> 其中保存了发生的 `epoll` 事件(`EPOLLIN`、`EPOLLOUT`、`EPOLLERR` 和 `EPOLLHUP`)以及发生该事件的 FD。 |
| 222 | +
|
| 223 | +`aeApiPoll` 函数只需要将 `epoll_event` 数组中存储的信息加入 `eventLoop` 的 `fired` 数组中,将信息传递给上层模块: |
| 224 | + |
| 225 | +```c |
| 226 | +static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { |
| 227 | + aeApiState *state = eventLoop->apidata; |
| 228 | + int retval, numevents = 0; |
| 229 | + |
| 230 | + retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, |
| 231 | + tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); |
| 232 | + if (retval > 0) { |
| 233 | + int j; |
| 234 | + |
| 235 | + numevents = retval; |
| 236 | + for (j = 0; j < numevents; j++) { |
| 237 | + int mask = 0; |
| 238 | + struct epoll_event *e = state->events+j; |
| 239 | + |
| 240 | + if (e->events & EPOLLIN) mask |= AE_READABLE; |
| 241 | + if (e->events & EPOLLOUT) mask |= AE_WRITABLE; |
| 242 | + if (e->events & EPOLLERR) mask |= AE_WRITABLE; |
| 243 | + if (e->events & EPOLLHUP) mask |= AE_WRITABLE; |
| 244 | + eventLoop->fired[j].fd = e->data.fd; |
| 245 | + eventLoop->fired[j].mask = mask; |
| 246 | + } |
| 247 | + } |
| 248 | + return numevents; |
| 249 | +} |
| 250 | +``` |
| 251 | +
|
| 252 | +### 子模块的选择 |
| 253 | +
|
| 254 | +因为 Redis 需要在多个平台上运行,同时为了最大化执行的效率与性能,所以会根据编译平台的不同选择不同的 I/O 多路复用函数作为子模块,提供给上层统一的接口;在 Redis 中,我们通过宏定义的使用,合理的选择不同的子模块: |
| 255 | +
|
| 256 | +```c |
| 257 | +#ifdef HAVE_EVPORT |
| 258 | +#include "ae_evport.c" |
| 259 | +#else |
| 260 | + #ifdef HAVE_EPOLL |
| 261 | + #include "ae_epoll.c" |
| 262 | + #else |
| 263 | + #ifdef HAVE_KQUEUE |
| 264 | + #include "ae_kqueue.c" |
| 265 | + #else |
| 266 | + #include "ae_select.c" |
| 267 | + #endif |
| 268 | + #endif |
| 269 | +#endif |
| 270 | +``` |
| 271 | + |
| 272 | +因为 `select` 函数是作为 POSIX 标准中的系统调用,在不同版本的操作系统上都会实现,所以将其作为保底方案: |
| 273 | + |
| 274 | + |
| 275 | + |
| 276 | +Redis 会优先选择时间复杂度为 $O(1)$ 的 I/O 多路复用函数作为底层实现,包括 Solaries 10 中的 `evport`、Linux 中的 `epoll` 和 macOS/FreeBSD 中的 `kqueue`,上述的这些函数都使用了内核内部的结构,并且能够服务几十万的文件描述符。 |
| 277 | + |
| 278 | +但是如果当前编译环境没有上述函数,就会选择 `select` 作为备选方案,由于其在使用时会扫描全部监听的描述符,所以其时间复杂度较差 $O(n)$,并且只能同时服务 1024 个文件描述符,所以一般并不会以 `select` 作为第一方案使用。 |
| 279 | + |
| 280 | +## 总结 |
| 281 | + |
| 282 | +Redis 对于 I/O 多路复用模块的设计非常简洁,通过宏保证了 I/O 多路复用模块在不同平台上都有着优异的性能,将不同的 I/O 多路复用函数封装成相同的 API 提供给上层使用。 |
| 283 | + |
| 284 | +整个模块使 Redis 能以单进程运行的同时服务成千上万个文件描述符,避免了由于多进程应用的引入导致代码实现复杂度的提升,减少了出错的可能性。 |
| 285 | + |
| 286 | +## Reference |
| 287 | + |
| 288 | ++ [Select-Man-Pages](http://man7.org/linux/man-pages/man2/select.2.html) |
| 289 | ++ [Reactor-Pattern](https://en.wikipedia.org/wiki/Reactor_pattern) |
| 290 | ++ [epoll vs kqueue](https://people.eecs.berkeley.edu/~sangjin/2012/12/21/epoll-vs-kqueue.html) |
| 291 | + |
| 292 | +## 其它 |
| 293 | + |
| 294 | +> Follow: [Draveness · GitHub](https://github.com/Draveness) |
| 295 | +> |
| 296 | +> Source: http://draveness.me/redis-io-multiplexing |
| 297 | +
|
| 298 | + |
0 commit comments