分享

【面试题】一文讲清,为啥redis单线程还有很高的性能?

 昵称p2F4g1IK 2021-09-25

面试的时候如果聊到缓存,肯定会聊到redis,因为它现在是缓存事实上的标准。

早些年一些互联网公司会用到memcached作为缓存,它是多线程的,用c语言开发,不过现在基本很少了,有兴趣的同学可以学习下它的源码。

那么聊redis,第一个问题就是它的工作原理,redis最最重要的工作原理,就是它的线程模型。

redis是单线程、nio、异步的线程模型,你要是这个都不知道,工作中用到redis,出了问题都不知道如何入手排查。

学习redis线程模型之前,必须了解socket网络编程相关知识,如果你不懂socket,肯定搞不懂redis原理。

redis使用文件事件处理器(file event handler)处理所有事件,这个文件事件处理器是单线程的,所以redis才叫单线程的。

文件事件处理器采用IO多路复用器,同时监听多个socket,将产生事件的socket放入内存队列中,事件分发器根据socket上的事件类型,选择对应的事件处理器进行处理。

图片
图1  redis 文件事件处理器过程

文件事件处理器的结构包含 4 个部分:

  • 客户端socket
  • IO 多路复用器
  • 文件事件分派器
  • 事件处理器(连接应答处理器、命令请求处理器、命令回复处理器)

多个客户端socket可能会并发产生不同的操作,每个操作对应不同的事件,但是 IO 多路复用器会监听多个socket,会将产生事件的 socket 放入队列中排队,事件分派器每次从队列中取出一个 socket,根据 socket 的事件类型交给对应的事件处理器进行处理。

连接应答处理器

在redis server启动的时候,会设置客户端建立连接时的连接应答处理器acceptTcpHandler,并和服务器监听套接字的AE_READABLE 事件关联起来,具体代码在server.c的initServer方法中:

图片
图2 redis初始化设置连接应答处理器

在networking.c 文件中acceptTcpHandler方法实现中,redis的连接应答处理器,用于对连接服务器进行监听,对套接字的客户端进行应答相应。

#define MAX_ACCEPTS_PER_CALL 1000
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);
  //循环处理连接应答
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    'Accepting client connection: %s', server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,'Accepted %s:%d', cip, cport);
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}

连接应答处理器

在接收到客户端的连接后,触发链接应答处理器的acceptTcpHandler方法,这个方法里会创建客户端对于的client对象,它代表着连接到 Redis 客户端。

并且accept客户端的连接,然后把这个连接与命令处理器readQueryFromClient关联起来,服务器会将连接成功后的socket的AE_READABLE事件和命令请求处理器关联起来,命令请求处理器就可以从socket中读取数据了。

图片
图3 建立连接后设置命令请求处理器

通过 networking.c中的readQueryFromClient方法,读取客户端发送的命令内容。

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *c = (client*) privdata;
    int nread, readlen;
    size_t qblen;
    UNUSED(el);
    UNUSED(mask);

    readlen = PROTO_IOBUF_LEN;
    /* If this is a multi bulk request, and we are processing a bulk reply
     * that is large enough, try to maximize the probability that the query
     * buffer contains exactly the SDS string representing the object, even
     * at the risk of requiring more read(2) calls. This way the function
     * processMultiBulkBuffer() can avoid copying buffers to create the
     * Redis Object representing the argument. */
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);

        /* Note that the 'remaining' variable may be zero in some edge case,
         * for example once we resume a blocked client after CLIENT PAUSE. */
        if (remaining > 0 && remaining < readlen) readlen = remaining;
    }

    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = read(fd, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (errno == EAGAIN) {
            return;
        } else {
            serverLog(LL_VERBOSE, 'Reading from client: %s',strerror(errno));
            freeClient(c);
            return;
        }
    } else if (nread == 0) {
        serverLog(LL_VERBOSE, 'Client closed connection');
        freeClient(c);
        return;
    } else if (c->flags & CLIENT_MASTER) {
        /* Append the query buffer to the pending (not applied) buffer
         * of the master. We'll use this buffer later in order to have a
         * copy of the string applied by the last command executed. */
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }

    sdsIncrLen(c->querybuf,nread);
    c->lastinteraction = server.unixtime;
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    server.stat_net_input_bytes += nread;
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

        bytes = sdscatrepr(bytes,c->querybuf,64);
        serverLog(LL_WARNING,'Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)', ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    }

    /* Time to process the buffer. If the client is a master we need to
     * compute the difference between the applied offset before and after
     * processing the buffer, to understand how much of the replication stream
     * was actually applied to the master state: this quantity, and its
     * corresponding part of the replication stream, will be propagated to
     * the sub-slaves and to the replication backlog. */
    processInputBufferAndReplicate(c);
}

void processInputBufferAndReplicate(client *c) {
    if (!(c->flags & CLIENT_MASTER)) {
        processInputBuffer(c);
    } else {
        size_t prev_offset = c->reploff;
        processInputBuffer(c);
        size_t applied = c->reploff - prev_offset;
        if (applied) {
            replicationFeedSlavesFromMasterStream(server.slaves,
                    c->pending_querybuf, applied);
            sdsrange(c->pending_querybuf,applied,-1);
        }
    }
}

processInputBuffer主要是将输入缓冲区中的数据解析成对应的命令,根据命令类型是 PROTO_REQ_MULTIBULK 还是 PROTO_REQ_INLINE,来分别调用 processInlineBuffer 和 processMultibulkBuffer 方法来解析命令。

然后调用 processCommand 方法来执行命令,redis中有一个类似map的东西,会记录每条命令对应的handler,根据解析出来的命令执行对应的handler即可。

命令回复处理器

命令处理完之后,redis server就要给客户端返回处理结果。networking.c中的sendReplyToClient方法就是redis 的命令回复处理器,此时会将socket的AE_WRITEABLE事件和命令回复处理器关联起来。这个处理器负责将服务器执行命令后得到的命令回复先写到缓存,再通过套接字返回给客户端。

图片
图4 设置命令回复处理器

最后画一幅完整的流程图,方便大家理解记忆。

图片
图5 redis线程模型

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多