Redis源码详解 - 后台线程
Redis是单线程的吗?⌗
看到许多网站上说Redis是单线程的,其实这样的说法并不严谨,研究了源代码后可以知道,Redis的主要逻辑是单线程的,但同时还有其他多个后台辅助线程。后台线程的程序逻辑比较简单,相关的代码在bio.h
和bio.c
两个文件中。
源码分析⌗
首先我们来看头文件bio.h
。
// 初始化后台I/O服务
void bioInit(void);
// 创建后台任务
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3);
// 某个类型的任务数量
unsigned long long bioPendingJobsOfType(int type);
// 等待某个类型的任务
unsigned long long bioWaitStepOfType(int type);
// 终止后台线程
void bioKillThreads(void);
// 后台线程处理3种任务 1. 关闭文件 2. 写AOF 3. 惰性释放内存
#define BIO_CLOSE_FILE 0
#define BIO_AOF_FSYNC 1
#define BIO_LAZY_FREE 2
#define BIO_NUM_OPS 3 // 后台任务数量3个
接下来我们看具体的实现`bio.c。
首先定义了一些变量
static pthread_t bio_threads[BIO_NUM_OPS]; // 3个子线程
static pthread_mutex_t bio_mutex[BIO_NUM_OPS]; // 3个互斥量
static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS]; // 3个条件变量,用于创建后台任务
static pthread_cond_t bio_step_cond[BIO_NUM_OPS]; // 3个条件变量,用于等待后台任务
static list *bio_jobs[BIO_NUM_OPS]; // 3个任务队列
static unsigned long long bio_pending[BIO_NUM_OPS]; // 记录pending状态的任务数量
接下来定义了后台任务结构体bio_job
struct bio_job {
// 创建时间
time_t time;
// 最多支持3个参数,job0 job1只需要1个参数,job2需要3个参数
void *arg1, *arg2, *arg3;
};
然后是主要的函数
// 初始化后台服务
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
for (j = 0; j < BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_newjob_cond[j],NULL);
pthread_cond_init(&bio_step_cond[j],NULL);
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}
/* Set the stack size as by default it may be small in some system */
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);
/* Ready to spawn our threads. We use the single argument the thread
* function accepts in order to pass the job ID the thread is
* responsible of. */
for (j = 0; j < BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
// 创建子线程,子线程任务函数bioProcessBackgroundJobs
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}
}
// 创建后台任务
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));
job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
// 加锁
pthread_mutex_lock(&bio_mutex[type]);
// 添加到队列尾部
listAddNodeTail(bio_jobs[type],job);
// 计数器++
bio_pending[type]++;
// 触发条件变量
pthread_cond_signal(&bio_newjob_cond[type]);
// 解锁
pthread_mutex_unlock(&bio_mutex[type]);
}
// 子线程任务函数
void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job;
// 任务类型,0,1,2
unsigned long type = (unsigned long) arg;
sigset_t sigset;
/* Check that the type is within the right interval. */
if (type >= BIO_NUM_OPS) {
// type超过3,错误
serverLog(LL_WARNING,
"Warning: bio thread started with wrong type %lu",type);
return NULL;
}
// 设置线程属性,可cancel并且异步cancel
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
// 加锁
pthread_mutex_lock(&bio_mutex[type]);
// 设置子线程信号掩码,屏蔽SIGALRM
sigemptyset(&sigset);
sigaddset(&sigset, SIGALRM);
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
// 禁用失败,打印warning
serverLog(LL_WARNING,
"Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
// 线程同步逻辑
while(1) {
listNode *ln;
if (listLength(bio_jobs[type]) == 0) {
// 等待条件变量
pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
continue;
}
ln = listFirst(bio_jobs[type]);
job = ln->value;
pthread_mutex_unlock(&bio_mutex[type]);
if (type == BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == BIO_AOF_FSYNC) {
// 这里没有用fsync,写AOF文件fdatasync,只确保更新数据,不确保元信息
redis_fsync((long)job->arg1);
} else if (type == BIO_LAZY_FREE) {
/* What we free changes depending on what arguments are set:
* arg1 -> free the object at pointer.
* arg2 & arg3 -> free two dictionaries (a Redis DB).
* only arg3 -> free the skiplist. */
if (job->arg1)
lazyfreeFreeObjectFromBioThread(job->arg1);
else if (job->arg2 && job->arg3)
lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
else if (job->arg3)
lazyfreeFreeSlotsMapFromBioThread(job->arg3);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
zfree(job);
/* Lock again before reiterating the loop, if there are no longer
* jobs to process we'll block again in pthread_cond_wait(). */
pthread_mutex_lock(&bio_mutex[type]);
listDelNode(bio_jobs[type],ln);
bio_pending[type]--;
/* Unblock threads blocked on bioWaitStepOfType() if any. */
pthread_cond_broadcast(&bio_step_cond[type]);
}
}