九、进阶 | Linux 内核中的同步原语(5)

Linux 内核中的同步原语。第 5 部分。

引言

这是描述 Linux 内核中同步原语的章节的第五部分,在前面的部分中我们已经讨论了不同类型的自旋锁信号量互斥锁同步原语。在这一部分,我们将继续学习同步原语,并开始考虑特殊类型的同步原语 - 读写锁

我们熟悉的第一种同步原语是信号量。正如本书的所有前面部分一样,在讨论 Linux 内核中读写信号量的实现之前,我们将从理论层面开始,并尝试理解读写信号量普通信号量之间的区别。

那么,让我们开始吧。

读写信号量

实际上,对数据可以执行两种类型的操作。我们可以读取数据和修改数据。两个基本操作是读取写入。通常(但并非总是),读取操作比写入操作更频繁。在这种情况下,逻辑上应该以这样的方式锁定数据,即一些进程可以同时读取锁定的数据,条件是没有人在修改数据。读写锁允许我们实现这种锁定。

当一个进程想要写入数据时,所有其他写入者读取者进程将被阻塞,直到获取锁定的进程释放它。当一个进程读取数据时,其他也想要读取相同数据的进程不会被锁定,并且能够这样做。你可以猜到,读写信号量的实现基于普通信号量的实现。我们已经在本章的第三部分中熟悉了信号量同步原语。从理论上讲,一切看起来都很简单。让我们看看 Linux 内核中读写信号量是如何表示的。

信号量由以下结构表示:

struct semaphore {
    raw_spinlock_t        lock;
    unsigned int         count;
    struct list_head     wait_list;
};

如果你查看include/linux/rwsem.h头文件,你将找到rw_semaphore结构的定义,它在 Linux 内核中表示读写信号量。让我们看看这个结构的定义:

#ifdef CONFIG_RWSEM_GENERIC_SPINLOCK
#include <linux/rwsem-spinlock.h>
#else
struct rw_semaphore {
    long                count;
    struct list_head    wait_list;
    raw_spinlock_t    wait_lock;
#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
    struct optimistic_spin_queue osq;
    struct task_struct *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
    struct lockdep_map      dep_map;
#endif
};

在我们考虑rw_semaphore结构的字段之前,我们可能会注意到,rw_semaphore结构的声明依赖于CONFIG_RWSEM_GENERIC_SPINLOCK内核配置选项。默认情况下,这个选项对于x86_64架构是禁用的。我们可以通过查看相应的内核配置文件来确定这一点。在我们的情况下,这个配置文件是 - arch/x86/um/Kconfig

config RWSEM_XCHGADD_ALGORITHM
    def_bool 64BIT

config RWSEM_GENERIC_SPINLOCK
    def_bool !RWSEM_XCHGADD_ALGORITHM

所以,由于这本书只描述了与x86_64架构相关的内容,我们将跳过CONFIG_RWSEM_GENERIC_SPINLOCK内核配置被启用的情况,只考虑从include/linux/rwsem.h头文件中的rw_semaphore结构的定义。

如果我们看一下rw_semaphore结构的定义,我们会注意到前三个字段与semaphore结构中的字段相同。它包含表示可用资源数量的count字段,表示等待获取锁定的进程的双向链表wait_list字段,以及用于保护此列表的wait_lock自旋锁。注意rw_semaphore.count字段是long类型,与semaphore结构中的同名字段不同。

rw_semaphore结构的count字段可能有以下值:

  • 0x0000000000000000 - 读写信号量处于未锁定状态,没有人在等待锁定;
  • 0x000000000000000X - X个读取者正在活动或尝试获取锁定,没有写入者在等待;
  • 0xffffffff0000000X - 可能表示不同的情况。第一种是 - X个读取者正在活动或尝试获取锁定,有等待锁定的等待者。第二种是 - 一个写入者尝试获取锁定,没有锁定的等待者。最后 - 一个写入者正在活动,没有锁定的等待者;
  • 0xffffffff00000001 - 可能表示两种不同的情况。第一种是 - 一个读取者正在活动或尝试获取锁定,并且存在锁定的等待者。第二种情况是一个写入者正在活动或尝试获取锁定,没有锁定的等待者;
  • 0xffffffff00000000 - 表示有读取者或写入者在队列中,但没有人正在活动或正在获取锁定的过程中;
  • 0xfffffffe00000001 - 一个写入者正在活动或尝试获取锁定,并且等待者在队列中。

所以,除了count字段,所有这些字段都与semaphore结构的字段相似。最后三个字段取决于 Linux 内核的两个配置选项:CONFIG_RWSEM_SPIN_ON_OWNERCONFIG_DEBUG_LOCK_ALLOC。前两个字段可能对我们来说是熟悉的,它们在前一部分中声明的互斥锁结构中。第一个osq字段表示MCS锁的自旋器,用于乐观自旋,第二个字段表示当前锁的所有者进程。

rw_semaphore结构的最后一个字段是 - dep_map - 与调试相关的,正如我在前几部分中已经写过的,我们将在本章跳过与调试相关的内容。

就这样。现在我们稍微了解了什么是读写锁,特别是读写信号量。此外,我们还看到了 Linux 内核中读写信号量的表示。在这种情况下,我们可以继续并开始查看 Linux 内核为操作读写信号量提供的API

读写信号量 API

所以,我们已经从理论上了解了读写信号量,让我们看看它在 Linux 内核中的实现。所有与读写信号量相关的API都位于include/linux/rwsem.h头文件中。

像往常一样,在考虑 Linux 内核中读写信号量机制的API之前,我们需要知道如何初始化rw_semaphore结构。正如我们在本章的前面部分已经看到的,所有同步原语都可以以两种方式初始化:

  • 静态
  • 动态

读写信号量也不例外。首先,让我们看看第一种方法。我们可以使用DECLARE_RWSEM宏在编译时初始化rw_semaphore结构。这个宏在include/linux/rwsem.h头文件中定义,如下所示:

#define DECLARE_RWSEM(name) \
    struct rw_semaphore name= __RWSEM_INITIALIZER(name)

如我们所见,DECLARE_RWSEM宏只是展开为定义具有给定名称的rw_semaphore结构体。此外,新的rw_semaphore结构体被初始化为__RWSEM_INITIALIZER宏的值:

#define __RWSEM_INITIALIZER(name)              \
{                                                  \
        .count = RWSEM_UNLOCKED_VALUE,                         \
        .wait_list = LIST_HEAD_INIT((name).wait_list),         \
        .wait_lock = __RAW_SPIN_LOCK_UNLOCKED(name.wait_lock)  \
         __RWSEM_OPT_INIT(name)                                \
         __RWSEM_DEP_MAP_INIT(name)
}

并展开为初始化rw_semaphore结构体的字段。首先,我们将rw_semaphore结构体的count字段初始化为unlocked状态,使用RWSEM_UNLOCKED_VALUE宏定义在arch/x86/include/asm/rwsem.h特定于架构的头文件中:

#define RWSEM_UNLOCKED_VALUE            0x00000000L

之后,我们用空的链表初始化锁等待者的列表,并用unlocked状态初始化用于保护此列表的自旋锁。__RWSEM_OPT_INIT宏取决于CONFIG_RWSEM_SPIN_ON_OWNER内核配置选项的状态,如果此选项启用,则展开为初始化rw_semaphore结构体的osqowner字段。正如我们已经看到的,CONFIG_RWSEM_SPIN_ON_OWNER内核配置选项默认情况下对于x86_64架构是启用的,那么让我们看看__RWSEM_OPT_INIT宏的定义:

#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
    #define __RWSEM_OPT_INIT(lockname) , .osq = OSQ_LOCK_UNLOCKED, .owner = NULL
#else
    #define __RWSEM_OPT_INIT(lockname)
#endif

如我们所见,__RWSEM_OPT_INIT宏将MCS锁锁初始化为unlocked状态,并将锁的初始owner设置为NULL。从这一刻起,rw_semaphore结构体将在编译时初始化,并可用于数据保护。

初始化rw_semaphore结构体的第二种方式是动态的,或者使用include/linux/rwsem.h头文件中的init_rwsem宏。这个宏声明了一个与 Linux 内核的锁验证器相关的lock_class_key实例,并调用了给定的reader/writer semaphore__init_rwsem函数:

#define init_rwsem(sem)                         \
do {                                                            \
        static struct lock_class_key __key;                     \
                                                                \
        __init_rwsem((sem), #sem, &__key);                      \
} while (0)

如果你开始定义__init_rwsem函数,你会发现有几个源代码文件包含它。如你所能猜到的,有时我们需要初始化rw_semaphore结构体的额外字段,如osqowner。但有时不需要。这一切都取决于一些内核配置选项。如果我们查看kernel/locking/Makefile makefile,我们将看到以下几行:

obj-$(CONFIG_RWSEM_GENERIC_SPINLOCK) += rwsem-spinlock.o
obj-$(CONFIG_RWSEM_XCHGADD_ALGORITHM) += rwsem-xadd.o

如我们已经知道的,Linux 内核对于x86_64架构默认启用了CONFIG_RWSEM_XCHGADD_ALGORITHM内核配置选项:

config RWSEM_XCHGADD_ALGORITHM
    def_bool 64BIT

arch/x86/um/Kconfig内核配置文件中。在这种情况下,__init_rwsem函数的实现将位于kernel/locking/rwsem.c源代码文件中。让我们看看这个函数:

void __init_rwsem(struct rw_semaphore *sem, const char *name,
                    struct lock_class_key *key)
{
#ifdef CONFIG_DEBUG_LOCK_ALLOC
        debug_check_no_locks_freed((void *)sem, sizeof(*sem));
        lockdep_init_map(&sem->dep_map, name, key, 0);
#endif
        sem->count = RWSEM_UNLOCKED_VALUE;
        raw_spin_lock_init(&sem->wait_lock);
        INIT_LIST_HEAD(&sem->wait_list);
#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
        sem->owner = NULL;
        osq_lock_init(&sem->osq);
#endif
}

我们在这里看到的几乎与__RWSEM_INITIALIZER宏中的相同,不同的是所有这些将在运行时执行。

所以,从现在开始我们能够初始化一个reader/writer semaphore让我们看看lockunlock API。Linux 内核提供了以下主要API来操作reader/writer semaphores

  • void down_read(struct rw_semaphore *sem) - 读取时锁定;
  • int down_read_trylock(struct rw_semaphore *sem) - 尝试读取时锁定;
  • void down_write(struct rw_semaphore *sem) - 写入时锁定;
  • int down_write_trylock(struct rw_semaphore *sem) - 尝试写入时锁定;
  • void up_read(struct rw_semaphore *sem) - 释放读取锁定;
  • void up_write(struct rw_semaphore *sem) - 释放写入锁定;

让我们从锁定开始。首先让我们考虑down_write函数的实现,它执行尝试获取write锁定。这个函数在kernel/locking/rwsem.c源代码文件中,并从调用include/linux/kernel.h头文件中的宏开始:

void __sched down_write(struct rw_semaphore *sem)
{
        might_sleep();
        rwsem_acquire(&sem->dep_map, 0, 0, _RET_IP_);

        LOCK_CONTENDED(sem, __down_write_trylock, __down_write);
        rwsem_set_owner(sem);
}

我们已经在前一部分中遇到了might_sleep宏。简而言之,might_sleep宏的实现取决于CONFIG_DEBUG_ATOMIC_SLEEP内核配置选项,如果此选项启用,该宏只是在原子上下文中执行时打印堆栈跟踪。由于这个宏主要用于调试目的,我们将跳过它,并继续前进。此外,我们将跳过down_read函数中的下一个宏 - rwsem_acquire,它与 Linux 内核的锁验证器有关,因为这是另一部分的主题。

down_write函数中剩下的唯一两件事是调用LOCK_CONTENDED宏,该宏定义在include/linux/lockdep.h头文件中,以及设置锁的所有者与rwsem_set_owner函数,该函数将所有者设置为当前运行的进程:

static inline void rwsem_set_owner(struct rw_semaphore *sem)
{
        sem->owner = current;
}

如你已经可能猜到的,LOCK_CONTENDED宏为我们做了所有工作。让我们看看LOCK_CONTENDED宏的实现:

#define LOCK_CONTENDED(_lock, try, lock) \
        lock(_lock

)

如我们所见,它只是调用LOCK_CONTENDED宏的第三个参数给定的lock函数,传入给定的rw_semaphore。在我们的例子中,LOCK_CONTENDED宏的第三个参数是__down_write函数,这是特定于架构的函数,位于arch/x86/include/asm/rwsem.h头文件中。让我们看看__down_write函数的实现:

static inline void __down_write(struct rw_semaphore *sem)
{
        long tmp;

        asm volatile("# beginning down_write\n\t"
                     LOCK_PREFIX "  xadd      %1,(%2)\n\t"
                     "  test " __ASM_SEL(%w1,%k1) "," __ASM_SEL(%w1,%k1) "\n\t"
                     "  jz        1f\n"
                     "  call call_rwsem_down_write_failed\n"
                     "1:\n"
                     "# ending down_write"
                     : "+m" (sem->count), "=d" (tmp)
                     : "a" (sem), "1" (RWSEM_ACTIVE_WRITE_BIAS)
                     : "memory", "cc");
}

对于我们在本章中看到的其他同步原语,通常lock/unlock函数只包含一个内联汇编语句。如我们所见,__down_write_nested函数的情况也是如此。让我们尝试理解这个函数的作用。我们的汇编语句的第一行只是一个注释,让我们跳过它。第二行包含LOCK_PREFIX,它将展开为LOCK指令,正如我们已经知道的。接下来的xadd指令执行addexchange操作。换句话说,xadd指令将RWSEM_ACTIVE_WRITE_BIAS的值:

#define RWSEM_ACTIVE_WRITE_BIAS         (RWSEM_WAITING_BIAS + RWSEM_ACTIVE_BIAS)

#define RWSEM_WAITING_BIAS              (-RWSEM_ACTIVE_MASK-1)
#define RWSEM_ACTIVE_BIAS               0x00000001L

0xffffffff00000001加到给定的reader/writer semaphorecount上,并返回它的先前值。之后我们检查rw_semaphore->count中的活动掩码。如果之前为零,则意味着之前没有写入者,所以我们获取了锁。否则我们调用call_rwsem_down_write_failed函数,该函数位于arch/x86/lib/rwsem.S汇编文件中。call_rwsem_down_write_failed函数只是预先保存通用寄存器,并调用kernel/locking/rwsem-xadd.c源代码文件中的rwsem_down_write_failed函数:

ENTRY(call_rwsem_down_write_failed)
    FRAME_BEGIN
    save_common_regs
    movq %rax,%rdi
    call rwsem_down_write_failed
    restore_common_regs
    FRAME_END
    ret
    ENDPROC(call_rwsem_down_write_failed)

rwsem_down_write_failed函数从原子更新count值开始:

__visible
struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
{
    count = rwsem_atomic_update(-RWSEM_ACTIVE_WRITE_BIAS, sem);
    ...
    ...
    ...
}

通过-RWSEM_ACTIVE_WRITE_BIAS值。我们撤销对count的写入偏差,因为我们没有获取到锁。之后我们尝试通过调用rwsem_optimistic_spin函数进行乐观自旋

if (rwsem_optimistic_spin(sem))
      return sem;

我们将跳过rwsem_optimistic_spin函数的实现,因为它与我们在前一部分中看到的mutex_optimistic_spin函数类似。简而言之,我们在rwsem_optimistic_spin函数中检查是否存在其他准备运行的任务,这些任务的优先级高于当前任务。如果存在这样的任务,进程将被添加到MCSwaitqueue并开始在循环中自旋,直到可以获取锁。如果乐观自旋被禁用,进程将被添加到wait_list并标记为等待写入:

waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_WRITE;

if (list_empty(&sem->wait_list))
    waiting = false;

list_add_tail(&waiter.list, &sem->wait_list);

等待者列表并开始等待直到成功获取锁。在我们把进程添加到之前为空的等待者列表之后,我们用RWSEM_WAITING_BIAS更新rw_semaphore->count的值:

count = rwsem_atomic_update(RWSEM_WAITING_BIAS, sem);

这样我们就标记rw_semaphore->counter,表明它已经被锁定,并且存在/等待一个想要获取锁的写入者。否则我们尝试唤醒wait queue中的读取进程,它们在这个写入进程之前排队,并且没有活动的读取者。在rwsem_down_write_failed的最后,没有获取到锁的写入进程将进入以下循环中睡眠:

while (true) {
    if (rwsem_try_write_lock(count, sem))
        break;
    raw_spin_unlock_irq(&sem->wait_lock);
    do {
        schedule();
        set_current_state(TASK_UNINTERRUPTIBLE);
    } while ((count = sem->count) & RWSEM_ACTIVE_MASK);
    raw_spin_lock_irq(&sem->wait_lock);
}

我将跳过这个循环的解释,因为我们已经在前一部分中遇到了类似的功能。

就这样。从这一刻起,我们的写入进程将获取或不获取锁,这取决于rw_semaphore->count字段的值。现在如果我们看看down_read函数的实现,它执行尝试获取锁定的尝试。我们将看到与down_write函数中看到的类似动作。这个函数调用不同的调试和锁验证器相关的函数/宏:

void __sched down_read(struct rw_semaphore *sem)
{
        might_sleep();
        rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);

        LOCK_CONTENDED(sem, __down_read_trylock, __down_read);
}

并且所有工作都在__down_read函数中完成。__down_read由内联汇编语句组成:

static inline void __down_read(struct rw_semaphore *sem)
{
         asm volatile("# beginning down_read\n\t"
                     LOCK_PREFIX _ASM_INC "(%1)\n\t"
                     "  jns        1f\n"
                     "  call call_rwsem_down_read_failed\n"
                     "1:\n\t"
                     "# ending down_read\n\t"
                     : "+m" (sem->count)
                     : "a" (sem)
                     : "memory", "cc");
}

它增加给定的rw_semaphore->count的值,并在该值为负时调用call_rwsem_down_read_failed。否则我们跳到标签1:并退出。之后读取锁将被成功获取。注意我们检查count值的符号,因为它可能是负数,因为如你记得的rw_semaphore->count的最重要字包含活动的写入者数量的负数。

让我们考虑一个进程想要获取读取操作的锁,但已经被锁定的情况。在这种情况下,将调用arch/x86/lib/rwsem.S汇编文件中的call_rwsem_down_read_failed函数。如果你查看这个函数的实现,你会发现它所做的与call_rwsem_down_write_failed函数相同。除了它调用的是rwsem_down_read_failed函数而不是rwsem_down_write_failed。现在让我们看看rwsem_down_read_failed函数的实现。它从将进程添加到wait queue并更新rw_semaphore->counter的值开始:

long adjustment = -RWSEM_ACTIVE_READ_BIAS

waiter.task = tsk;
waiter.type = RWSEM_WAITING_FOR_READ;

if (list_empty(&sem->wait_list))
    adjustment += RWSEM_WAITING_BIAS;
list_add_tail(&waiter.list, &sem->wait_list);

count = rwsem_atomic_update(adjustment, sem);

注意,如果我们在之前清空了wait queue,那么这次我们将清除rw_semaphore->counter并撤消read bias。接下来,我们检查没有活动锁,并且我们是wait queue中的第一个,我们需要加入当前活动的reader进程。否则我们进入睡眠状态,直到锁可以被获取。

就这样。现在我们知道了在不同情况下readerwriter进程在获取锁时的行为。现在让我们简单看一下unlock操作。up_readup_write函数允许我们解锁readerwriter锁。首先让我们看看定义在kernel/locking/rwsem.c源代码文件中的up_write函数的实现:

void up_write(struct rw_semaphore *sem)
{
        rwsem_release(&sem->dep_map, 1, _RET_IP_);

        rwsem_clear_owner(sem);
        __up_write(sem);
}

首先,它调用与 Linux 内核锁验证器相关的rwsem_release宏,所以我们现在将跳过它。接下来是rwsem_clear_owner函数,顾名思义,它只是清除给定rw_semaphoreowner字段:

static inline void rwsem_clear_owner(struct rw_semaphore *sem)
{
    sem->owner = NULL;
}

__up_write函数做了所有解锁的工作。__up_write是特定于架构的函数,所以对于我们的情况,它将位于arch/x86/include/asm/rwsem.h源代码文件中。如果我们看看这个函数的实现,我们将看到它几乎与__down_write函数相同,但是方向相反。它不是向rw_semaphore->count添加RWSEM_ACTIVE_WRITE_BIAS,而是减去相同的值,并检查先前值的符号

如果rw_semaphore->count的先前值不是负数,那么一个写入进程释放了锁,现在可能被其他人获取。否则,rw_semaphore->count将包含负值。这意味着至少有一个写入者在等待队列中。在这种情况下,将调用call_rwsem_wake函数。这个函数的行为类似于我们已经看到的类似函数。它在栈上存储通用寄存器以保留,并调用rwsem_wake函数。

首先,rwsem_wake函数检查是否有自旋器存在。在这种情况下,它将仅仅获取刚刚被锁主人释放的锁。否则,wait queue中必须有人在等待,我们需要唤醒位于wait queue顶部的写入进程,或者所有的读取进程。up_read函数释放读取锁的行为方式与up_write类似,但有一点不同。它不是从rw_semaphore->count减去RWSEM_ACTIVE_WRITE_BIAS,而是从中减去1,因为count的低位部分包含活动锁的数量。之后它检查count的符号,并在为负数时调用rwsem_wake,就像__up_write一样,或者锁将被成功释放。

就这样。我们已经考虑了操作reader/writer semaphore的API:up_read/up_writedown_read/down_write。我们看到,除了这些函数,Linux 内核还提供了额外的API,如等。但我将不会在这部分考虑这些函数的实现,因为它必须与我们在这部分看到的类似,除了一些小的细微差别。

结论

这是 Linux 内核中同步原语章节的第五部分的结尾。在这部分中,我们遇到了一种特殊的semaphore - readers/writer semaphore,它允许多个进程同时读取数据,或者一个进程写入数据。在下一部分中,我们将继续深入探讨 Linux 内核中的同步原语。

如果您有任何问题或建议,请随时在 Twitter 上联系我0xAX,给我发电子邮件,或者在这里创建一个问题。

请注意,英语不是我的第一语言,如果有任何不便,我深表歉意。如果您发现任何错误,请给我发送 PR 到linux-insides

链接

Linux嵌入式必考必会 文章被收录于专栏

&quot;《Linux嵌入式必考必会》专栏,专为嵌入式开发者量身打造,聚焦Linux环境下嵌入式系统面试高频考点。涵盖基础架构、内核原理、驱动开发、系统优化等核心技能,实战案例与理论解析并重。助你快速掌握面试关键,提升竞争力。

全部评论

相关推荐

伟大的烤冷面被普调:暨大✌🏻就是强
点赞 评论 收藏
分享
点赞 收藏 评论
分享
牛客网
牛客企业服务