Redis源码详解 – 后台线程

Redis是单线程的吗?

看到许多网站上说Redis是单线程的,其实这样的说法并不严谨,研究了源代码后可以知道,Redis的主要逻辑是单线程的,但同时还有其他多个后台辅助线程。后台线程的程序逻辑比较简单,相关的代码在bio.hbio.c两个文件中。

源码分析

首先我们来看头文件bio.h

// 初始化后台I/O服务
void bioInit(void);
// 创建后台任务
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3);
// 某个类型的任务数量
unsigned long long bioPendingJobsOfType(int type);
// 等待某个类型的任务
unsigned long long bioWaitStepOfType(int type);
// 终止后台线程
void bioKillThreads(void);

// 后台线程处理3种任务 1. 关闭文件 2. 写AOF 3. 惰性释放内存
#define BIO_CLOSE_FILE 0
#define BIO_AOF_FSYNC 1
#define BIO_LAZY_FREE 2
#define BIO_NUM_OPS 3 // 后台任务数量3个

接下来我们看具体的实现`bio.c。

首先定义了一些变量

static pthread_t bio_threads[BIO_NUM_OPS]; // 3个子线程
static pthread_mutex_t bio_mutex[BIO_NUM_OPS]; // 3个互斥量
static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];  // 3个条件变量,用于创建后台任务
static pthread_cond_t bio_step_cond[BIO_NUM_OPS];  // 3个条件变量,用于等待后台任务
static list *bio_jobs[BIO_NUM_OPS]; // 3个任务队列
static unsigned long long bio_pending[BIO_NUM_OPS]; // 记录pending状态的任务数量

接下来定义了后台任务结构体bio_job

struct bio_job {
    // 创建时间
    time_t time;
    // 最多支持3个参数,job0 job1只需要1个参数,job2需要3个参数
    void *arg1, *arg2, *arg3;
};

然后是主要的函数

// 初始化后台服务
void bioInit(void) {
    pthread_attr_t attr;
    pthread_t thread;
    size_t stacksize;
    int j;
    for (j = 0; j < BIO_NUM_OPS; j++) {
        pthread_mutex_init(&bio_mutex[j],NULL);
        pthread_cond_init(&bio_newjob_cond[j],NULL);
        pthread_cond_init(&bio_step_cond[j],NULL);
        bio_jobs[j] = listCreate();
        bio_pending[j] = 0;
    }

    /* Set the stack size as by default it may be small in some system */
    pthread_attr_init(&attr);
    pthread_attr_getstacksize(&attr,&stacksize);
    if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
    pthread_attr_setstacksize(&attr, stacksize);

    /* Ready to spawn our threads. We use the single argument the thread
     * function accepts in order to pass the job ID the thread is
     * responsible of. */
    for (j = 0; j < BIO_NUM_OPS; j++) {
        void *arg = (void*)(unsigned long) j;
        // 创建子线程,子线程任务函数bioProcessBackgroundJobs
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1);
        }
        bio_threads[j] = thread;
    }
}
// 创建后台任务
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));

    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    // 加锁
    pthread_mutex_lock(&bio_mutex[type]);
    // 添加到队列尾部
    listAddNodeTail(bio_jobs[type],job);
    // 计数器++
    bio_pending[type]++;
    // 触发条件变量
    pthread_cond_signal(&bio_newjob_cond[type]);
    // 解锁
    pthread_mutex_unlock(&bio_mutex[type]);
}

// 子线程任务函数
void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
    // 任务类型,0,1,2
    unsigned long type = (unsigned long) arg;
    sigset_t sigset;
    /* Check that the type is within the right interval. */
    if (type >= BIO_NUM_OPS) {
        // type超过3,错误
        serverLog(LL_WARNING,
            "Warning: bio thread started with wrong type %lu",type);
        return NULL;
    }
    // 设置线程属性,可cancel并且异步cancel
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
    // 加锁
    pthread_mutex_lock(&bio_mutex[type]);
    // 设置子线程信号掩码,屏蔽SIGALRM
    sigemptyset(&sigset);
    sigaddset(&sigset, SIGALRM);
    if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
        // 禁用失败,打印warning
        serverLog(LL_WARNING,
            "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));

    // 线程同步逻辑
    while(1) {
        listNode *ln;
        if (listLength(bio_jobs[type]) == 0) {
            // 等待条件变量
            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
            continue;
        }
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        pthread_mutex_unlock(&bio_mutex[type]);
        if (type == BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == BIO_AOF_FSYNC) {
            // 这里没有用fsync,写AOF文件fdatasync,只确保更新数据,不确保元信息
            redis_fsync((long)job->arg1);
        } else if (type == BIO_LAZY_FREE) {
            /* What we free changes depending on what arguments are set:
             * arg1 -> free the object at pointer.
             * arg2 & arg3 -> free two dictionaries (a Redis DB).
             * only arg3 -> free the skiplist. */
            if (job->arg1)
                lazyfreeFreeObjectFromBioThread(job->arg1);
            else if (job->arg2 && job->arg3)
                lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
            else if (job->arg3)
                lazyfreeFreeSlotsMapFromBioThread(job->arg3);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        zfree(job);

        /* Lock again before reiterating the loop, if there are no longer
         * jobs to process we'll block again in pthread_cond_wait(). */
        pthread_mutex_lock(&bio_mutex[type]);
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--;

        /* Unblock threads blocked on bioWaitStepOfType() if any. */
        pthread_cond_broadcast(&bio_step_cond[type]);
    }
}

发表评论

电子邮件地址不会被公开。 必填项已用*标注