• Author: 鹅厂大叔
  • Link: https://wechat-daydream.netlify.app/redis-network/
  • Contact: contact@naver.com
  • 转载须保留原文链接, 禁止一切商业形式用途, 谢谢合作!
  • 文章更新时间: 2020.05

Redis v4.0, 开始引入多线程处理异步任务; Redis v6.0, 正式在网络模型中实现I/O多线程. 一切预示着, Redis终于迎来了多线程"时代", 但Redis真的是"多线程"吗? 在正式回答这个问题前, 先来回答几个简单问题.

问题一: Redis有多快?

基于Redis官方的Benchmark, 通常在一台普通硬件配置的Linux机器上跑单个Redis实例, 处理简单命令, QPS可以达到8w+, 若使用管道批处理功能, QPS能达到100w. 仅从性能层面进行评判, Redis可以被称之为高性能缓存方案.

问题二: Redis为什么这么快?

要点参考如下:

  • 内存缓存, 与其它刷写数据至磁盘的数据库相比, Redis纯内存操作有着天然的性能优势.
  • I/O多路复用, 基于epoll/select/kqueue等I/O多路复用技术, 吞吐量高.
  • 单线程模型, 单线程效率不如多线程, 但从另一角度而言, 避免了多线程频繁上下文切换以及同步机制(如锁等)带来的开销.
  • C语言实现

问题三: Redis最初为什么选择单线程?

Redis官方描述, 参考如下:

It's not very frequent that CPU becomes your bottleneck with Redis, as usually Redisis either memory or network bound. For instance, using pipelining Redis running on an average Linux system can deliver even 1 million requests per second, so if your application mainly uses O(N) or O(log(N)) commands, it is hardly going to use too much CPU.

简单翻译下, 主要意思在于, 对于Redis而言, CPU通常不会是瓶颈, 因为大多数请求并非CPU密集型, 而是I/O密集型. 如果不考虑RDB/AOF等持久化方案, 完全的纯内存操作, 执行速度非常快, 因而这部分操作通常不会是性能瓶颈, Redis真正的性能瓶颈在于网络I/O, 也就是客户端和服务端之间的网络传输延迟, 因此Redis选择了单线程的I/O多路复用来实现它的核心网络模型.

实际上具体原因, 在于Redis单线程:

避免线程上下文切换开销

多线程调度过程中必然涉及线程上下文切换, 而上下文的切换又涉及程序计数器、堆栈指针和程序状态字等一系列的寄存器置换、程序堆栈重置甚至是CPU高速缓存、TLB快表的汰换, 如果是进程内的多线程切换还好一些, 因为单一进程内多线程共享进程地址空间, 因此线程上下文比之进程上下文要小得多, 如果是跨进程调度, 则需要切换掉整个进程地址空间. 如果是单线程则可以规避频繁的线程上下文切换开销.

避免同步机制带来的性能损耗

若Redis选择多线程模型, 势必涉及到底层数据同步的问题, 则必然会引入某些同步机制, 比如锁, 而Redis不仅仅提供了简单的KV数据结构, 还有List、Set、Hash等其他丰富的数据结构, 而不同的数据结构对同步访问的加锁粒度又不尽相同, 可能会导致在操作数据过程中带来很多诸如加锁、解锁等开销, 增加程序复杂度的同时还会降低性能.

实现简单、可维护性强

引入多线程必然会导致代码的复杂度上升和可维护性下降. 首先, 多线程的引入会使得程序不再保持代码逻辑上的串行性, 代码执行的顺序将变成不可预测的, 稍不注意就会导致程序出现各种并发编程的问题; 其次, 多线程模式也使得程序调试更加复杂和麻烦.

如果Redis使用多线程, 那么所有底层数据结构均须线程安全, 这无疑又使得Redis的实现变得更加复杂. 总之, Redis选择单线程可以说是多方博弈之后的一种权衡: 在保证足够的性能之下, 使用单线程保持代码的简单和可维护性.

问题来了, Redis真的是单线程吗?

讨论这个问题前, 先明确单线程这个概念的边界: 它的覆盖范围是核心网络模型, 抑或是整个Redis?如果是前者, 那么答案是肯定的, 在Redis的v6.0版本正式引入多线程之前, 其网络模型一直是单线程模式的; 如果是后者, 那么答案则是否定的, Redis早在v4.0就已经引入了多线程.

因而, 在讨论Redis的多线程之时, 有必要对Redis版本划出两个重要的节点:

  • Redis v4.0(引入多线程处理异步任务)
  • Redis v6.0(正式在网络模型中实现I/O多线程)

单线程Event Loop

首先来剖析一下Redis的核心网络模型, 从Redis的v1.0到v6.0版本之前,Redis的核心网络模型一直是一个典型的Reactor模型: 利用epoll/select/kqueue等多路复用技术, 在单线程的Event Loop中不断去处理事件(客户端请求, 最后回写响应数据到客户端):

需要明确的几个概念

  • client: 客户端对象,Redis是典型的C/S架构(Client <—> Server), 客户端通过socket与服务端建立网络通道然后发送请求命令, 服务端执行请求的命令并回复. Redis使用结构体 client 存储客户端的所有相关信息, 包括但不限于封装的套接字连接*conn, 当前选择的数据库指针*db, 读入缓冲区querybuf, 写出缓冲区buf, 写出数据链表reply等.

  • aeApiPoll: I/O多路复用API, 基于epoll_wait/select/kevent等系统调用的封装, 监听等待读写事件触发, 然后处理, 它是Event Loop中的核心函数, 是事件驱动得以运行的基础.

  • acceptTcpHandler: 连接应答处理器, 底层使用系统调用accept接受来自客户端的新连接, 并为新连接注册绑定命令读取处理器, 以备后续处理新的客户端TCP连接; 除了这个处理器, 还有对应的acceptUnixHandler负责处理Unix Domain Socket以及acceptTLSHandler负责处理TLS加密连接.

  • readQueryFromClient: 命令读取处理器, 解析并执行客户端的请求命令.

  • beforeSleep: Event Loop中进入aeApiPoll等待事件到来之前会执行的函数, 其中包含一些日常的任务, 比如把client->buf或者client->reply(后面会解释为什么这里需要两个缓冲区)中的响应写回到客户端, 持久化AOF缓冲区的数据到磁盘等, 相对应的还有一个afterSleep函数, 在aeApiPoll之后执行.

  • sendReplyToClient: 命令回复处理器, 当一次Event Loop之后写出缓冲区中还有数据残留, 则这个处理器会被注册绑定到相应的连接上, 等连接触发写就绪事件时, 它会将写出缓冲区剩余的数据回写到客户端

Redis内部实现了一个高性能的事件库--AE(基于epoll/select/kqueue/evport四种事件驱动技术, 实现Linux/MacOS/FreeBSD/Solaris多平台的高性能事件循环模型). Redis的核心网络模型正是构建于AE之上, 包括I/O多路复用、各类处理器的注册绑定, 均基于此.

下面, 简单描述下客户端向Redis发起请求命令的工作原理

  1. Redis服务器启动, 开启主线程事件循环(Event Loop), 注册acceptTcpHandler连接应答处理器到用户配置的监听端口对应的文件描述符, 等待新连接到来;

  2. 客户端和服务端建立网络连接;

  3. acceptTcpHandler被调用, 主线程使用AE的API将readQueryFromClient命令读取处理器绑定到新连接对应的文件描述符上, 并初始化一个client绑定这个客户端连接;

  4. 客户端发送请求命令, 触发读就绪事件, 主线程调用readQueryFromClient通过socket读取客户端发送过来的命令存入client->querybuf读入缓冲区;

  5. 接着调用processInputBuffer, 在其中使用processInlineBuffer或者processMultibulkBuffer根据Redis协议解析命令, 最后调用processCommand执行命令;

  6. 根据请求命令的类型(SET, GET, DEL, EXEC 等), 分配相应的命令执行器去执行, 最后调用addReply函数族的一系列函数将响应数据写入到对应client的写出缓冲区: client->buf或者client->reply, client->buf是首选的写出缓冲区, 固定大小16KB, 一般来说可以缓冲足够多的响应数据, 但是如果客户端在时间窗口内需要响应的数据非常大, 那么则会自动切换到client->reply链表上去, 使用链表理论上能够保存无限大的数据(受限于机器的物理内存), 最后把client添加进一个 LIFO队列clients_pending_write;

  7. 在Event Loop中, 主线程执行beforeSleep --> handleClientsWithPendingWrites, 遍历clients_pending_write队列, 调用writeToClient把client的写出缓冲区里的数据回写到客户端, 如果写出缓冲区还有数据遗留, 则注册sendReplyToClient命令回复处理器到该连接的写就绪事件, 等待客户端可写时在事件循环中再继续回写残余的响应数据.

对于那些想利用多核优势提升性能的用户来说, Redis官方给出的解决方案也非常简单粗暴: 在同一个机器上多跑几个Redis实例. 事实上, 为了保证高可用, 线上业务一般不太可能会是单机模式, 更加常见的是利用Redis分布式集群和数据分片负载均衡来提升性能和保证高可用.

多线程异步任务

Redis单线程网络模型一直到Redis v6.0才改造成多线程模式, 但这并不意味着整个Redis一直都只是单线程. Redis在v4.0版本的时候就已经引入了的多线程来做一些异步操作, 此举主要针对的是那些非常耗时的命令, 通过将这些命令的执行进行异步化, 避免阻塞单线程的Event Loop.

比如Redis的DEL命令是用来删除掉一个或多个key储存的值, 它是一个阻塞的命令, 大多数情况下要删除的key里存的值不会特别多, 最多也就几十上百个对象, 所以可以很快执行完, 但是如果你要删的是一个超大的键值对, 里面有几百万个对象, 那么这条命令可能会阻塞至少好几秒, 又因为Event Loop是单线程的, 所以会阻塞后面的其他事件, 导致吞吐量下降.

Redis的作者antirez为了解决这个问题进行了很多思考, 一开始他想的办法是一种渐进式的方案: 利用定时器和数据游标, 每次只删除一小部分的数据, 比如1000个对象, 最终清除掉所有的数据, 但是这种方案有个致命的缺陷, 如果同时还有其他客户端往某个正在被渐进式删除的key里继续写入数据, 而且删除的速度跟不上写入的数据, 那么将会无止境地消耗内存, 虽然后来通过一个巧妙的办法解决了, 但是这种实现使Redis变得更加复杂, 而多线程看起来似乎是一个水到渠成的解决方案: 简单、易理解. 于是, 最终antirez选择引入多线程来实现这一类非阻塞的命令. 更多antirez在这方面的思考可以阅读一下他的博客.

因而, 在Redis v4.0之后增加了一些的非阻塞命令如UNLINKFLUSHALL ASYNCFLUSHDB ASYNC.

UNLINK命令其实就是DEL的异步版本, 它不会同步删除数据, 而只是把key从keyspace中暂时移除掉, 然后将任务添加到一个异步队列, 最后由后台线程去删除, 不过这里需要考虑一种情况是如果用UNLINK去删除一个很小的key, 用异步的方式去做反而开销更大, 所以它会先计算一个开销的阀值, 只有当这个值大于64才会使用异步的方式去删除key, 对于基本的数据类型如List、Set、Hash, 阀值就是其中存储的对象数量.

Redis多线程网络模型介绍

为什么Redis又要引入多线程呢?很简单, Redis网络I/O瓶颈日趋明显. 随着互联网的飞速发展, 互联网业务系统所要处理的线上流量越来越大, Redis的单线程模式会导致系统消耗很多CPU时间在网络I/O上从而降低吞吐量, 要提升Redis的性能有两个方向:

  • 优化网络I/O模块
  • 提高机器内存读写的速度

后者依赖于硬件的发展, 暂时无解. 所以只能从前者下手, 网络I/O的优化又可以分为两个方向:

  • 零拷贝技术或者DPDK技术
  • 充分利用多核优势

零拷贝技术有其局限性, 无法完全适配Redis这一类复杂网络I/O场景, 更多网络I/O对CPU时间的消耗和Linux零拷贝技术, 而DPDK技术通过旁路网卡I/O绕过内核协议栈的方式又太过于复杂以及需要内核甚至是硬件的支持.

因此, 利用多核优势成为了优化网络I/O性价比最高的方案. 6.0版本之后,Redis正式在核心网络模型中引入了多线程, 也就是所谓的I/O Threading, 至此Redis真正拥有了多线程模型. Redis在6.0版本之前的单线程Event Loop模型, 实际上就是一个非常经典的Reactor模型:

目前Linux平台上主流的高性能网络库/框架中, 大都采用Reactor模式, 比如netty、libevent、libuv、POE(Perl)、Twisted(Python)等. Reactor模式本质上指的是使用I/O多路复用(I/O multiplexing) + 非阻塞I/O(non-blocking I/O)的模式.

Redis的核心网络模型在6.0版本之前, 一直是单Reactor模式: 所有事件的处理都在单个线程内完成, 虽然在4.0版本中引入了多线程, 但是那个更像是针对特定场景(如删除超大Key值等)而打的补丁, 并不能被视作核心网络模型的多线程.

通常来说, 单Reactor模式, 引入多线程之后会进化为Multi-Reactors模式, 基本工作模式如下:

区别于单Reactor模式, 这种模式不再是单线程的Event Loop, 而是有多个线程(Sub Reactors)各自维护一个独立的Event Loop, 由Main Reactor负责接收新连接并分发给Sub Reactors去独立处理, 最后Sub Reactors回写响应给客户端. Multiple Reactors模式通常也可以等同于Master-Workers 模式, 比如Nginx和Memcached等就是采用这种多线程模型, 虽然不同的项目实现细节略有区别, 但总体来说模式是一致的.

设计思路

先看一下Redis多线程网络模型的总体设计:

  1. Redis服务器启动, 开启主线程事件循环(Event Loop), 注册acceptTcpHandler连接应答处理器到用户配置的监听端口对应的文件描述符, 等待新连接到来;

  2. 客户端和服务端建立网络连接;

  3. acceptTcpHandler被调用, 主线程使用AE的API将readQueryFromClient命令读取处理器绑定到新连接对应的文件描述符上, 并初始化一个client绑定这个客户端连接;

  4. 客户端发送请求命令, 触发读就绪事件, 服务端主线程不会通过socket去读取客户端的请求命令, 而是先将client放入一个LIFO队列clients_pending_read;

  5. 在Event Loop中, 主线程执行beforeSleep-->handleClientsWithPendingReadsUsingThreads, 利用Round-Robin轮询负载均衡策略, 把 clients_pending_read队列中的连接均匀地分配给I/O线程各自的本地FIFO任务队列io_threads_list[id]和主线程自己, I/O线程通过socket读取客户端的请求命令, 存入client->querybuf并解析第一个命令, 但不执行命令, 主线程忙轮询, 等待所有I/O线程完成读取任务;

  6. 主线程和所有I/O线程都完成了读取任务, 主线程结束忙轮询, 遍历clients_pending_read队列, 执行所有客户端连接的请求命令, 先调用processCommandAndResetClient执行第一条已经解析好的命令, 然后调用processInputBuffer解析并执行客户端连接的所有命令, 在其中使用processInlineBuffer或者processMultibulkBuffer根据Redis协议解析命令, 最后调用processCommand执行命令;

  7. 根据请求命令的类型(SET, GET, DEL, EXEC等), 分配相应的命令执行器去执行, 最后调用addReply函数族的一系列函数将响应数据写入到对应client的写出缓冲区: client->buf或者client->reply, client->buf是首选的写出缓冲区, 固定大小16KB, 一般来说可以缓冲足够多的响应数据, 但是如果客户端在时间窗口内需要响应的数据非常大, 那么则会自动切换到client->reply链表上去, 使用链表理论上能够保存无限大的数据(受限于机器的物理内存), 最后把client添加进一个LIFO队列clients_pending_write;

  8. 在Event Loop中, 主线程执行beforeSleep-->handleClientsWithPendingWritesUsingThreads, 利用Round-Robin轮询负载均衡策略, 把 clients_pending_write队列中的连接均匀地分配给I/O线程各自的本地FIFO任务队列io_threads_list[id]和主线程自己,I/O线程通过调用writeToClient把client的写出缓冲区里的数据回写到客户端, 主线程忙轮询, 等待所有I/O线程完成写出任务;

  9. 主线程和所有I/O线程都完成了写出任务, 主线程结束忙轮询, 遍历clients_pending_write队列, 如果client的写出缓冲区还有数据遗留, 则注册sendReplyToClient到该连接的写就绪事件, 等待客户端可写时在事件循环中再继续回写残余的响应数据.

这里大部分逻辑和之前的单线程模型是一致的, 变动的地方仅仅是把读取客户端请求命令和回写响应数据的逻辑异步化了, 交给I/O线程去完成, 这里需要特别注意的一点是:I/O线程仅仅是读取和解析客户端命令而不会真正去执行命令, 客户端命令的执行最终还是要在主线程上完成.

源码剖析

以下源码解读基于Redis v6.0.10版本源码

多线程初始化

void initThreadedIO(void) {
    server.io_threads_active = 0; /* We start with threads not active. */
    // 如果用户只配置了一个I/O线程, 则不会创建新线程(效率低), 直接在主线程里处理 I/O. 
    if (server.io_threads_num == 1) return;
 
    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too manyI/Othreads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }
 
    // 根据用户配置的I/O线程数, 启动线程. 
    for (int i = 0; i < server.io_threads_num; i++) {
        // 初始化I/O线程的本地任务队列. 
        io_threads_list[i] = listCreate();
        if (i == 0) continue; // 线程 0 是主线程. 
 
        // 初始化I/O线程并启动. 
        pthread_t tid;
        // 每个I/O线程会分配一个本地锁, 用来休眠和唤醒线程. 
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        // 每个I/O线程分配一个原子计数器, 用来记录当前遗留的任务数量. 
        io_threads_pending[i] = 0;
        // 主线程在启动I/O线程的时候会默认先锁住它, 直到有I/O任务才唤醒它. 
        pthread_mutex_lock(&io_threads_mutex[i]);
        // 启动线程, 进入I/O线程的主逻辑函数 IOThreadMain. 
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}

initThreadedIO会在Redis服务器启动时的初始化工作的末尾被调用, 初始化I/O多线程并启动.

Redis多线程模式默认是关闭的, 需用户在redis.conf配置文件中开启, 如

io-threads 4
io-threads-do-reads yes

读取请求

当客户端发送请求命令之后, 会触发Redis主线程的事件循环, 命令处理器readQueryFromClient被回调, 在以前的单线程模型下, 这个方法会直接读取解析客户端命令并执行, 但是多线程模式下, 则会把client加入到clients_pending_read任务队列中去, 后面主线程再分配到I/O线程去读取客户端请求命令:

void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;
 
    // 检查是否开启了多线程, 如果是则把 client 加入异步队列之后返回. 
    if (postponeClientRead(c)) return;
    
    // 略, 下面的代码逻辑和单线程版本几乎是一样的
    ... 
}
 
int postponeClientRead(client *c) {
    // 当多线程I/O模式开启、主线程没有在处理阻塞任务时, 将 client 加入异步队列. 
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
    {
        // 给 client 打上 CLIENT_PENDING_READ 标识, 表示该 client 需要被多线程处理, 
        // 后续在I/O线程中会在读取和解析完客户端命令之后判断该标识并放弃执行命令, 让主线程去执行. 
        c->flags |= CLIENT_PENDING_READ;
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
        return 0;
    }
}

接着主线程会在Event Loop的beforeSleep()方法中, 调用handleClientsWithPendingReadsUsingThreads:

int handleClientsWithPendingReadsUsingThreads(void) {
    if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;
 
    if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);
 
    // 遍历待读取的 client 队列 clients_pending_read, 
    // 通过 RR 轮询均匀地分配给I/O线程和主线程自己(编号 0). 
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }
 
    // 设置当前I/O操作为读取操作, 给每个I/O线程的计数器设置分配的任务数量, 
    // 让I/O线程可以开始工作: 只读取和解析命令, 不执行. 
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }
 
    // 主线程自己也会去执行读取客户端请求命令的任务, 以达到最大限度利用 CPU. 
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);
 
    // 忙轮询, 累加所有I/O线程的原子任务计数器, 直到所有计数器的遗留任务数量都是 0, 
    // 表示所有任务都已经执行完成, 结束轮询. 
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O READ All threads finshed\n");
 
    // 遍历待读取的 client 队列, 清除 CLIENT_PENDING_READ 和 CLIENT_PENDING_COMMAND 标记, 
    // 然后解析并执行所有 client 的命令. 
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        listDelNode(server.clients_pending_read,ln);
 
        if (c->flags & CLIENT_PENDING_COMMAND) {
            c->flags &= ~CLIENT_PENDING_COMMAND;
            // client 的第一条命令已经被解析好了, 直接尝试执行. 
            if (processCommandAndResetClient(c) == C_ERR) {
                /* If the client is no longer valid, we avoid
                 * processing the client later. So we just go
                 * to the next. */
                continue;
            }
        }
        processInputBuffer(c); // 继续解析并执行 client 命令. 
 
        // 命令执行完成之后, 如果 client 中有响应数据需要回写到客户端, 则将 client 加入到待写出队列 clients_pending_write
        if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
            clientInstallWriteHandler(c);
    }
 
    /* Update processed count on server */
    server.stat_io_reads_processed += processed;
    return processed;
}

此处的核心工作为:

  • 遍历待读取的Client队列clients_pending_read, 通过RR策略把所有任务分配给I/O线程和主线程去读取和解析客户端命令.
  • 忙轮询等待所有I/O线程完成任务.
  • 最后再遍历clients_pending_read, 执行所有client的命令.

写回响应

完成命令的读取、解析以及执行之后, 客户端命令的响应数据已经存入client->buf或者client->reply中了, 接下来就需要把响应数据回写到客户端了, 还是在beforeSleep中, 主线程调用handleClientsWithPendingWritesUsingThreads

int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */
 
    // 如果用户设置的I/O线程数等于 1 或者当前 clients_pending_write 队列中待写出的 client
    // 数量不足I/O线程数的两倍, 则不用多线程的逻辑, 让所有I/O线程进入休眠, 
    // 直接在主线程把所有 client 的相应数据回写到客户端. 
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }
 
    // 唤醒正在休眠的I/O线程(如果有的话). 
    if (!server.io_threads_active) startThreadedIO();
 
    if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
 
    // 遍历待写出的 client 队列 clients_pending_write, 
    // 通过 RR 轮询均匀地分配给I/O线程和主线程自己(编号 0). 
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
 
        /* Remove clients from the list of pending writes since
         * they are going to be closed ASAP. */
        if (c->flags & CLIENT_CLOSE_ASAP) {
            listDelNode(server.clients_pending_write, ln);
            continue;
        }
 
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }
 
    // 设置当前I/O操作为写出操作, 给每个I/O线程的计数器设置分配的任务数量, 
    // 让I/O线程可以开始工作, 把写出缓冲区(client->buf 或 c->reply)中的响应数据回写到客户端. 
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }
 
    // 主线程自己也会去执行写数据到客户端的任务, 以达到最大限度利用 CPU. 
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);
 
    // 忙轮询, 累加所有I/O线程的原子任务计数器, 直到所有计数器的遗留任务数量都是 0. 
    // 表示所有任务都已经执行完成, 结束轮询. 
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O WRITE All threads finshed\n");
 
    // 最后再遍历一次 clients_pending_write 队列, 检查是否还有 client 的写出缓冲区中有残留数据, 
    // 如果有, 那就为 client 注册一个命令回复器 sendReplyToClient, 等待客户端写就绪再继续把数据回写. 
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
 
        // 检查 client 的写出缓冲区是否还有遗留数据. 
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);
 
    /* Update processed count on server */
    server.stat_io_writes_processed += processed;
    return processed;
}

此处的核心工作是:

  • 检查当前任务负载, 如果当前的任务数量不足以用多线程模式处理的话, 则休眠I/O线程并且直接同步将响应数据回写到客户端.
  • 唤醒正在休眠的I/O线程(如果有的话).
  • 遍历待写出的client队列clients_pending_write, 通过RR策略把所有任务分配给I/O线程和主线程去将响应数据写回到客户端.
  • 忙轮询等待所有I/O线程完成任务.
  • 最后再遍历clients_pending_write, 为那些还残留有响应数据的client注册命令回复处理器sendReplyToClient, 等待客户端可写之后在Event Loop中继续回写残余的响应数据.

I/O线程主逻辑

void *IOThreadMain(void *myid) {
    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
     * used by the thread to just manipulate a single sub-array of clients. */
    long id = (unsigned long)myid;
    char thdname[16];
 
    snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
    redis_set_thread_title(thdname);
    // 设置I/O线程的 CPU 亲和性, 尽可能将I/O线程(以及主线程, 不在这里设置)绑定到用户配置的
    // CPU 列表上. 
    redisSetCpuAffinity(server.server_cpulist);
    makeThreadKillable();
 
    while(1) {
        // 忙轮询, 100w 次循环, 等待主线程分配I/O任务. 
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }
 
        // 如果 100w 次忙轮询之后如果还是没有任务分配给它, 则通过尝试加锁进入休眠, 
        // 等待主线程分配任务之后调用 startThreadedIO 解锁, 唤醒I/O线程去执行. 
        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }
 
        serverAssert(io_threads_pending[id] != 0);
 
        if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
 
 
        // 注意: 主线程分配任务给I/O线程之时, 
        // 会把任务加入每个线程的本地任务队列 io_threads_list[id], 
        // 但是当I/O线程开始执行任务之后, 主线程就不会再去访问这些任务队列, 避免数据竞争. 
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            // 如果当前是写出操作, 则把 client 的写出缓冲区中的数据回写到客户端. 
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
              // 如果当前是读取操作, 则socket 读取客户端的请求命令并解析第一条命令. 
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        // 所有任务执行完之后把自己的计数器置 0, 主线程通过累加所有I/O线程的计数器
        // 判断是否所有I/O线程都已经完成工作. 
        io_threads_pending[id] = 0;
 
        if (tio_debug) printf("[%ld] Done\n", id);
    }
}

I/O线程启动之后, 会先进入忙轮询, 判断原子计数器中的任务数量, 如果是非0则表示主线程已经给它分配了任务, 开始执行任务, 否则就一直忙轮询一百万次等待, 忙轮询结束之后再查看计数器, 如果还是0, 则尝试加本地锁, 因为主线程在启动I/O线程之时就已经提前锁住了所有I/O线程的本地锁, 因此I/O线程会进行休眠, 等待主线程唤醒.

主线程会在每次事件循环中尝试调用startThreadedIO唤醒I/O线程去执行任务, 如果接收到客户端请求命令, 则I/O线程会被唤醒开始工作, 根据主线程设置的io_threads_op标识去执行命令读取和解析或者回写响应数据的任务,I/O线程在收到主线程通知之后, 会遍历自己的本地任务队列io_threads_list[id], 取出一个个client执行任务:

  • 如果当前是写出操作, 则调用writeToClient, 通过socket把client->buf或者client->reply里的响应数据回写到客户端.
  • 如果当前是读取操作, 则调用readQueryFromClient, 通过 socket 读取客户端命令, 存入client->querybuf, 然后调用processInputBuffer去解析命令, 这里最终只会解析到第一条命令, 然后就结束, 不会去执行命令.
  • 在全部任务执行完之后把自己的原子计数器置0, 以告知主线程自己已经完成了工作.
void processInputBuffer(client *c) {
    // 略
    while(c->qb_pos < sdslen(c->querybuf)) {
        /* Return if clients are paused. */
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
 
        /* Immediately abort if the client is in the middle of something. */
        if (c->flags & CLIENT_BLOCKED) break;
 
        /* Don't process more buffers from clients that have already pending
         * commands to execute in c->argv. */
        if (c->flags & CLIENT_PENDING_COMMAND) break;
        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            // 判断 client 是否具有 CLIENT_PENDING_READ 标识, 如果是处于多线程I/O的模式下, 
            // 那么此前已经在 readQueryFromClient -> postponeClientRead 中为 client 打上该标识, 
            // 则立刻跳出循环结束, 此时第一条命令已经解析完成, 但是不执行命令. 
            if (c->flags & CLIENT_PENDING_READ) {
                c->flags |= CLIENT_PENDING_COMMAND;
                break;
            }
 
            // 执行客户端命令
            if (processCommandAndResetClient(c) == C_ERR) {
                /* If the client is no longer valid, we avoid exiting this
                 * loop and trimming the client buffer later. So we return
                 * ASAP in that case. */
                return;
            }
        }
    }
    // ... 略
}

这里需要额外关注I/O线程初次启动时会设置当前线程的CPU亲和性, 也就是绑定当前线程到用户配置的CPU上, 在启动Redis服务器主线程的时候同样会设置CPU亲和性, Redis的核心网络模型引入多线程之后, 加上之前的多线程异步任务、多进程(BGSAVE、AOF、BIO、Sentinel 脚本任务等), Redis现如今的系统并发度已经很大了, 而Redis本身又是一个对吞吐量和延迟极度敏感的系统, 所以用户需要Redis对CPU资源有更细粒度的控制, 这里主要考虑的是两方面: CPU高速缓存和NUMA架构.

首先是CPU高速缓存(这里讨论的是L1 Cache和L2 Cache都集成CPU中的硬件架构), 这里想象一种场景: Redis主进程正在CPU-1上运行, 给客户端提供数据服务, 此时Redis启动了子进程进行数据持久化(BGSAVE 或者 AOF), 系统调度之后子进程抢占了主进程的CPU-1, 主进程被调度到CPU-2上去运行, 导致之前CPU-1的高速缓存里的相关指令和数据被汰换掉, CPU-2需要重新加载指令和数据到自己的本地高速缓存里, 浪费CPU资源, 降低性能.

因此, Redis通过设置CPU亲和性, 可以将主进程/线程和子进程/线程绑定到不同的核隔离开来, 使之互不干扰, 能有效地提升系统性能. 其次是基于NUMA架构的考虑, 在NUMA体系下, 内存控制器芯片被集成到处理器内部, 形成CPU本地内存, 访问本地内存只需通过内存通道而无需经过系统总线, 访问时延大大降低, 而多个处理器之间通过QPI数据链路互联, 跨NUMA节点的内存访问开销远大于本地内存的访问:

因此,Redis通过设置CPU亲和性, 让主进程/线程尽可能在固定的NUMA节点上的CPU上运行, 更多地使用本地内存而不需要跨节点访问数据, 同样也能大大地提升性能.

最后还有一点, 阅读过源码的读者可能会有疑问, Redis多线程模式下, 似乎并没有对数据进行锁保护, 事实上Redis的多线程模型是全程无锁(Lock-free)的, 这是通过原子操作+交错访问来实现的, 主线程和I/O线程之间共享的变量有三个: io_threads_pending计数器、io_threads_op I/O标识符和io_threads_list线程本地任务队列.

io_threads_pending是原子变量, 不需要加锁保护, io_threads_opio_threads_list这两个变量则是通过控制主线程和I/O线程交错访问来规避共享数据竞争问题: I/O线程启动之后会通过忙轮询和锁休眠等待主线程的信号, 在这之前它不会去访问自己的本地任务队列io_threads_list[id], 而主线程会在分配完所有任务到各个I/O线程的本地队列之后才去唤醒 I/O线程开始工作, 并且主线程之后在I/O线程运行期间只会访问自己的本地任务队列io_threads_list[0]而不会再去访问I/O线程的本地队列, 这也就保证了主线程永远会在I/O线程之前访问io_threads_list并且之后不再访问, 保证了交错访问. io_threads_op同理, 主线程会在唤醒I/O线程之前先设置好io_threads_op的值, 并且在I/O线程运行期间不会再去访问这个变量.

性能有提升吗?

Redis 将核心网络模型改造成多线程模式, 追求的自然是最终性能上的提升, 废话不多说, 直接上数据:

测试数据表明, Redis在使用多线程模式之后性能大幅提升, 达到了一倍. 更详细的性能压测数据可以参阅这篇文章. 以下是美图技术团队实测的新旧Redis版本性能对比图, 仅供参考:

不足之处

Redis的多线程网络模型实际上并不是一个标准的Multi-Reactors/Master-Workers模型, 和其他主流的开源网络服务器的模式有所区别, 最大的不同就是在标准的Multi-Reactors/Master-Workers模式下, Sub Reactors/Workers会完成网络读 -> 数据解析 -> 命令执行 -> 网络写整套流程, Main Reactor/Master只负责分派任务, 而在Redis的多线程方案中, I/O线程任务仅仅是通过socket读取客户端请求命令并解析, 却没有真正去执行命令, 所有客户端命令最后还需要回到主线程去执行, 因此对多核的利用率并不算高, 而且每次主线程都必须在分配完任务之后忙轮询等待所有I/O线程完成任务之后才能继续执行其他逻辑.

Redis之所以如此设计它的多线程网络模型, 个人认为主要原因是为了保持兼容性, 之前版本的Redis单线程, 所有的客户端命令都是在单线程的Event Loop里执行, 因而Redis里所有的数据结构都是非线程安全的, 现在引入多线程, 如果按照标准的Multi-Reactors/Master-Workers模式来实现, 则所有内置的数据结构都必须重构成线程安全的, 工作量大而且麻烦.

个人认为, Redis目前的多线程方案更像是一个折中的选择: 既保持了原系统的兼容性, 又能利用多核提升I/O性能. 其次, 目前Redis的多线程模型中, 主线程和I/O线程的通信过于简单粗暴: 忙轮询和锁, 因为通过自旋忙轮询进行等待, 导致Redis在启动的时候以及运行期间偶尔会有短暂的CPU空转引起的高占用率, 且目前该通信机制的实现看起来非常不直观和不简洁, 希望后面Redis能对目前的方案加以改进.

读到这里, 对于本文开篇的那个问题, 聪明的你兴许早已有了答案, 对不?

小结

Redis自2009年发布第一版之后, 其单线程网络模型的选择在社区中从未停止过讨论, 多年来一直有呼声希望Redis能引入多线程从而利用多核优势, 但作者antirez是一个追求大道至简的开发者, 对Redis加入任何新功能都异常谨慎, 所以在Redis初版发布的十年后才最终将Redis的核心网络模型改造成多线程模式, 这期间甚至诞生了一些Redis多线程的替代项目. 虽然antirez一直在推迟多线程的方案, 但却从未停止思考多线程的可行性, Redis多线程网络模型的改造不是一朝一夕的事情,这其中牵扯到项目的方方面面,所以目前Redis的方案也并不完美, 没有采用主流的多线程模式设计.

最后, 再来回顾一下Redis多线程网络模型的设计方案:

  • 使用I/O线程实现网络I/O多线程化, I/O线程只负责网络I/O和命令解析, 不执行客户端命令.
  • 利用原子操作+交错访问实现无锁的多线程模型.
  • 通过设置CPU亲和性, 隔离主进程和其他子进程, 充分利用多线程网络模型实现性能最大化.