redis7.x源码分析:(6) bio后台服务
redis的BIO(Background I/O)顾名思义:后台I/O服务,是Redis中用于处理一些需要异步执行的后台任务的一个线程池。这些后台任务包括了文件同步(fsync)、文件关闭(close)、对象释放(free)等,以避免这些操作阻塞主线程,从而影响处理效率。
由此我们也可以看出,redis其实并非真正的单线程程序,只是它的主要数据库读写处理流程是在主线程完成的,但内部是会启动多种线程来辅助提升效率的。除了BIO的线程外,后续我还会讲解网络IO多线程的代码实现,也会涉及到整体主流程框架的实现原理。
BIO是基于生产者消费者模型实现的,使用锁和条件变量来同步。redis为每种BIO操作类型创建了一个后台线程,每个线程处理一个特定类型的任务队列。当任务到来时,主线程将任务添加到相应操作类型的任务队列中,并唤醒等待在该队列上的线程。后台线程循环检查并取出队列中的任务执行,直到队列为空。
下面看一下主要的代码实现
BIO主要结构定义:
// 线程对象数组 static pthread_t bio_threads[BIO_NUM_OPS]; // 线程锁数组 static pthread_mutex_t bio_mutex[BIO_NUM_OPS]; // 任务通知条件等待对象数组 static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS]; // 任务执行完毕条件等待对象数组 static pthread_cond_t bio_step_cond[BIO_NUM_OPS]; // 任务链表数组 static list *bio_jobs[BIO_NUM_OPS]; /* The following array is used to hold the number of pending jobs for every * OP type. This allows us to export the bioPendingJobsOfType() API that is * useful when the main thread wants to perform some operation that may involve * objects shared with the background thread. The main thread will just wait * that there are no longer jobs of this type to be executed before performing * the sensible operation. This data is also useful for reporting. */ // 任务等待数数组 static unsigned long long bio_pending[BIO_NUM_OPS]; /* This structure represents a background Job. It is only used locally to this * file as the API does not expose the internals at all. */ typedef union bio_job { /* Job specific arguments.*/ struct { // 文件句柄 int fd; /* Fd for file based background jobs */ // 是否在关闭前刷新到磁盘(文件系统缓冲区) unsigned need_fsync:1; /* A flag to indicate that a fsync is required before * the file is closed. */ } fd_args; struct { // 延迟释放函数 lazy_free_fn *free_fn; /* Function that will free the provided arguments */ // 释放参数 void *free_args[]; /* List of arguments to be passed to the free function */ } free_args; } bio_job;
BIO初始化:
void bioInit(void) { pthread_attr_t attr; pthread_t thread; size_t stacksize; int j; /* Initialization of state vars and objects */ // 初始化所有线程中需要用到的数组 for (j = 0; j < BIO_NUM_OPS; j++) { pthread_mutex_init(&bio_mutex[j],NULL); pthread_cond_init(&bio_newjob_cond[j],NULL); pthread_cond_init(&bio_step_cond[j],NULL); bio_jobs[j] = listCreate(); bio_pending[j] = 0; } /* Set the stack size as by default it may be small in some system */ // 设置线程栈大小 pthread_attr_init(&attr); pthread_attr_getstacksize(&attr,&stacksize); if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; pthread_attr_setstacksize(&attr, stacksize); /* Ready to spawn our threads. We use the single argument the thread * function accepts in order to pass the job ID the thread is * responsible of. */ // 创建所有后台任务线程 for (j = 0; j < BIO_NUM_OPS; j++) { void *arg = (void*)(unsigned long) j; if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs."); exit(1); } bio_threads[j] = thread; } }
BIO任务提交:
void bioSubmitJob(int type, bio_job *job) { // 根据type添加任务到相应链表中,然后通知处理线程处理 pthread_mutex_lock(&bio_mutex[type]); listAddNodeTail(bio_jobs[type],job); bio_pending[type]++; pthread_cond_signal(&bio_newjob_cond[type]); pthread_mutex_unlock(&bio_mutex[type]); } void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) { va_list valist; /* Allocate memory for the job structure and all required * arguments */ // 分配函数指针以及参数所需要的空间 bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count)); job->free_args.free_fn = free_fn; // 存储所有参数 va_start(valist, arg_count); for (int i = 0; i < arg_count; i++) { job->free_args.free_args[i] = va_arg(valist, void *); } va_end(valist); // 提交延迟释放任务 bioSubmitJob(BIO_LAZY_FREE, job); } void bioCreateCloseJob(int fd, int need_fsync) { bio_job *job = zmalloc(sizeof(*job)); job->fd_args.fd = fd; job->fd_args.need_fsync = need_fsync; // 提交关闭文件任务 bioSubmitJob(BIO_CLOSE_FILE, job); } void bioCreateFsyncJob(int fd) { bio_job *job = zmalloc(sizeof(*job)); job->fd_args.fd = fd; // 提交文件刷新到磁盘任务 bioSubmitJob(BIO_AOF_FSYNC, job); }
BIO任务线程函数:
void *bioProcessBackgroundJobs(void *arg) { bio_job *job; unsigned long type = (unsigned long) arg; sigset_t sigset; /* Check that the type is within the right interval. */ if (type >= BIO_NUM_OPS) { serverLog(LL_WARNING, "Warning: bio thread started with wrong type %lu",type); return NULL; } // 设置线程名 switch (type) { case BIO_CLOSE_FILE: redis_set_thread_title("bio_close_file"); break; case BIO_AOF_FSYNC: redis_set_thread_title("bio_aof_fsync"); break; case BIO_LAZY_FREE: redis_set_thread_title("bio_lazy_free"); break; } // 设置cpu和线程的绑定 redisSetCpuAffinity(server.bio_cpulist); // 设置线程运行可以随时被取消 makeThreadKillable(); pthread_mutex_lock(&bio_mutex[type]); /* Block SIGALRM so we are sure that only the main thread will * receive the watchdog signal. */ // 屏蔽SIGALRM sigemptyset(&sigset); sigaddset(&sigset, SIGALRM); if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) serverLog(LL_WARNING, "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); while(1) { listNode *ln; /* The loop always starts with the lock hold. */ // 任务链表为空时进行等待 if (listLength(bio_jobs[type]) == 0) { pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]); continue; } /* Pop the job from the queue. */ // 取出任务 ln = listFirst(bio_jobs[type]); job = ln->value; /* It is now possible to unlock the background system as we know have * a stand alone job structure to process.*/ pthread_mutex_unlock(&bio_mutex[type]); /* Process the job accordingly to its type. */ if (type == BIO_CLOSE_FILE) { if (job->fd_args.need_fsync) { // 刷新到磁盘 redis_fsync(job->fd_args.fd); } // 关闭文件 close(job->fd_args.fd); } else if (type == BIO_AOF_FSYNC) { /* The fd may be closed by main thread and reused for another * socket, pipe, or file. We just ignore these errno because * aof fsync did not really fail. */ // 刷新到磁盘 if (redis_fsync(job->fd_args.fd) == -1 && errno != EBADF && errno != EINVAL) { int last_status; atomicGet(server.aof_bio_fsync_status,last_status); atomicSet(server.aof_bio_fsync_status,C_ERR); atomicSet(server.aof_bio_fsync_errno,errno); if (last_status == C_OK) { serverLog(LL_WARNING, "Fail to fsync the AOF file: %s",strerror(errno)); } } else { atomicSet(server.aof_bio_fsync_status,C_OK); } } else if (type == BIO_LAZY_FREE) { // 执行释放函数 job->free_args.free_fn(job->free_args.free_args); } else { serverPanic("Wrong job type in bioProcessBackgroundJobs()."); } zfree(job); /* Lock again before reiterating the loop, if there are no longer * jobs to process we'll block again in pthread_cond_wait(). */ pthread_mutex_lock(&bio_mutex[type]); listDelNode(bio_jobs[type],ln); bio_pending[type]--; /* Unblock threads blocked on bioWaitStepOfType() if any. */ // 通知其它执行 bioWaitStepOfType() 的线程结束等待 pthread_cond_broadcast(&bio_step_cond[type]); } }
/* Return the number of pending jobs of the specified type. */ unsigned long long bioPendingJobsOfType(int type) { unsigned long long val; pthread_mutex_lock(&bio_mutex[type]); // 返回任务数量 val = bio_pending[type]; pthread_mutex_unlock(&bio_mutex[type]); return val; } unsigned long long bioWaitStepOfType(int type) { unsigned long long val; pthread_mutex_lock(&bio_mutex[type]); val = bio_pending[type]; if (val != 0) { // 等待任务完成通知 pthread_cond_wait(&bio_step_cond[type],&bio_mutex[type]); val = bio_pending[type]; } pthread_mutex_unlock(&bio_mutex[type]); return val; }
未经许可,请勿转载!作者:jwybobo2007
#redis#