信号量(Semaphore)

信号量是操作系统提供的一种协调共享资源访问的方法。它和软件同步的区别是:

  • 软件同步是平等线程间的一种同步协商机制
  • 信号量是由操作系统进行管理的,它的地位高于进程(而非平等协商)

信号量由Dijkstra在20世纪60年代提出,目前仍然在OS中被使用。

信号量简介

信号是一种抽象数据类型:

  • 由一个整型变量(sem)和两个原子操作组成
  • sem:要共享的资源的数目
  • P()操作(Prolaag,尝试减少,请求资源时进行)
    • sem-1
    • sem<0,进入等待,否则继续运行
  • V()操作(Verhoog,增加,释放资源时进行)
    • sem+1
    • sem<=0,则唤醒一个等待进程

信号量的一些特点:

  • 信号量可以被认为是被保护的整数变量
    • 初始化完成后,只能通过P()和V()操作修改
    • 由操作系统保证,PV操作是原子操作
    • 可以保证不受应用进程的干扰
  • P()操作可能阻塞,但V()操作不会阻塞
  • 通常假定信号量是“公平的”
    • 线程不会被无限期阻塞在P()操作(实际系统中有一个最长时限的参数,超时之后错误返回)
    • 假定信号量等待按先进先出排队(但是在实际系统中公平有所偏差)

下面给出信号量在原理上的一个实现(之所以是原理上的实现,是因为实际实现时要考虑很多问题):

class Semaphore {
    int sem;
    WaitQueue q;
}

Semaphore::P() {
    sem--;
    if (sem < 0) {
        Add this thread t to q;
        block(t);
    }
}

Semaphore::V() {
    sem++;
    if (sem >= 0) {
        Remove a thread t from q;
        wakeup(t);
    }
}

信号量使用

信号量一般可分为两类:

  • 二进制信号量:资源数目为0或1
  • 资源信号量:资源数目为任何非负值

下面讨论两个信号量的使用场景:

  • 互斥访问:临界区的互斥访问控制
  • 条件同步:线程间的事件等待
互斥访问

互斥访问的实现方法非常简单:

// 每类资源设置一个信号量,其初值为1
mutex = new Semaphore(1);
// 临界区的控制代码如下:
mutex->P();
Critical Section;
mutex->V();

在这种情况下,信号量的使用和锁相同。需要注意以下几点:

  • 必须成对使用P()操作和V()操作
  • P()操作保证互斥访问临界资源
  • V()操作在使用后释放临界资源
  • PV操作不能次序颠倒、重复或遗漏
条件同步

举例:线程A执行完X模块之后,线程B才能执行Y模块。

// 设置一个初值为0的信号量
condition = new Semaphore(0);

// Thread A
Module X {
    ...
}
condition.V();
...

// Thread B
...
condition.P();
Module Y {
    ...
}

管程(Monitor)

管程是一种用于多线程互斥访问共享资源的程序结构

  • 采用面向对象方法,简化了线程间的同步控制
  • 任一时刻最多只有一个线程执行管程代码
  • 正在管程中的线程可临时放弃管程的互斥访问,等待事件出现时恢复(这是最大的特点)

管程的组成:

  • 一个锁:控制管程代码的互斥访问
  • 入口队列:每次只能有一个线程进入
  • 0或多个条件变量:管理共享数据的并发访问

下面需要介绍一下条件变量了。

条件变量(Condition Variable)

条件变量是管程内的等待机制

  • 进入管程的线程因资源被占用而进入等待状态
  • 每个条件变量表示一种等待原因,对应一个等待队列
  • Wait()操作:
    • 将自己阻塞在等待队列中
    • 唤醒一个等待者或释放管程的互斥访问(即允许另外一个线程进入管程)
  • Signal()操作:
    • 将等待队列中的一个线程唤醒
    • 如果等待队列为空,这就相当于是一个空操作

条件变量在原理上的实现和信号量有些类似:

class Condition {
    int numWaiting = 0;
    WaitQueue q;
}

Condition::Wait(Lock lock) {
    numWaiting++;
    Add this thread t to q;
    release(lock);
    schedule(); // need mutex; 取一个就绪态(ready)的线程继续执行
    acquire(lock);
}

Condition::Signal() {
    if (numWaiting > 0) {
        Remove a thread t from q;
        wakeup(t); // need mutex; 把当前sleep的线程重新置成ready状态
        numWaiting--;
    }
}

但是有很多不同的地方,如:

  • 对管程的互斥锁的释放和获得
  • signal和P语义的不同:PV操作必须是成对的,但signal/wait操作完全不需要保证这一点
  • wait和V语义的不同:V操作后线程可能会继续执行,但wait操作后,线程必然进入等待队列并阻塞
  • 执行signal/wait时,都默认已经获得了互斥锁

管程的语义

事实上管程一共有三种语义:

  • Mesa语义
  • Hoare语义
  • Hansen语义

考虑如下情况:线程A在条件变量的等待队列中等待资源,此时线程B在该资源(或者说该条件变量)上执行signal操作。

Mesa语义
  • 线程B执行signal之后,不会立刻退出管程,而是执行到lock.release()之后才进入就绪态
  • 线程A会被移动到入口等待队列中
  • 在wait后被唤醒的进程不一定会被立刻调度,因此需要用while来检查条件
  • 大部分实际实现的管程采用的是这一语义

image-20200723221042066

Hoare语义
  • 线程B执行signal之后,迅速唤醒等待中的线程A,自己进入signal队列中(这个队列是此语义特有的)
  • 每次有线程退出时,先到signal队列中调度线程,如果signal队列空,才到入口等待队列调度线程
  • 实际实现中一般不采用,因为需要多一个队列,代价增大了

image-20200723221101549

Brinch Hanson语义
  • 线程B退出的同时才执行signal操作

image-20200723221122339

经典问题

生产者-消费者问题

有界缓冲区的生产者-消费者问题描述:

  • 一个或多个生产者在生成数据后放在一个缓冲区里
  • 单个消费者从缓冲区取出数据处理
  • 任何时候只能有一个生产者或消费者可访问缓冲区
信号量实现

问题分析:

  • 任何时刻只能有一个线程操作缓冲区,这是临界区(互斥访问)
  • 缓冲区空时,消费者必须等待生产者(条件同步)
  • 缓冲区满时,生产者必须等待消费者(条件同步)

用信号量描述每个约束:

  • 二进制信号量mutex:互斥
  • 资源信号量fullBuffers:缓冲区为满(信号量的值表示缓冲区中数据的个数)
  • 资源信号量emptyBuffers:缓冲区为空(信号量的值表示缓冲区中空位的个数)
  • 其中fullBuffers+emptyBuffers=缓冲区大小

(事实上,OSTEP中讨论了如果只使用一个资源信号量会导致死锁的问题,在此不再赘述了)

下面给出(原理上的)代码实现:

class BoundedBuffer {
    mutex = new Semaphore(1);
    fullBuffers = new Semaphore(0);
    emptyBuffers = new Semaphore(n);
}

BoundedBuffer::Deposit(c) {
    emptyBuffers->P();
    mutex->P();
    Add c to the buffer;
    mutex->V();
    fullBuffers->V();
}

BoundedBuffer::Remove(c) {
    fullBuffers->P();
    mutex->P();
    Remove c from buffer;
    mutex->V();
    emptyBuffers->V();
}

注意P操作之间不能颠倒顺序。V操作不会阻塞,就无所谓了。

管程实现
class BoundedBuffer {
    Lock lock;
    int count = 0;
    Condition notFull, notEmpty;
}

BoundedBuffer::Deposit(c) {
    lock->Acquire();
    while (count == n)
        notFull.Wait(&lock);
    Add c to the buffer;
    count++;
    notEmpty.signal();
    lock->Release();
}

BoundedBuffer::Remove(c) {
    lock->Acquire();
    while (count == 0)
        notEmpty.Wait(&lock);
    Remove c from buffer;
    count--;
    notFull.signal();
    lock->Release();
}
信号量和管程实现的对比

image-20200723221139370

如图,信号量中存有int变量sem以及WaitQueue变量q,根据信号量的实现代码(左下角ppt),我们可以得出semq的含义:

  • q代表当前正在等待资源的线程组成的等待队列,若当前资源足够所有进程使用,q为空;
  • sem代表【到目前为止,若所有请求该资源的线程都能够获取该资源,那么资源还剩下多少(这里我们假设资源个数可以为负)】;
  • sem也可以有另一种理解:当sem非负时,sem代表剩余资源的个数;当sem为负数时,sem的绝对值代表等待队列q的长度。

而当我们使用条件变量解决有限资源问题时,我们通常会在条件变量之外,管程之中加入整型变量count(右上角ppt),来帮助条件变量记录当前剩余多少资源(非负)。查看条件变量的实现代码(右下角ppt),我们可以得出条件变量中整型变量numWaiting以及WaitQueue变量q的含义:

  • q代表当前正在等待资源的线程组成的等待队列,若当前资源足够所有进程使用,q为空;
  • numWaiting代表等待队列q的长度(非负)。

结合使用信号量以及条件变量解决有限资源问题的实例(左上及右上ppt),以及以上我们对信号量和条件变量的分析,我们可以得出以下结论:

  • 在任一状态,信号量中的q和条件变量中的q完全相同;
  • sem非负时,含义与管程中的count相同,此时numWaiting为0;
  • sem为负数时,sem的绝对值等于numWaiting,此时count为0。

在生产者-消费者这个问题实例中:

  • 信号量emptyBuffers与条件变量notFull是匹配的,满足上面3个条件,此时count在代码中以n - count的形式出现;
  • 信号量fullBuffers与条件变量notEmpty是匹配的,满足上面3个条件,此时count在代码中以count的形式出现;

综上所述,两种解决方法是完全等价的,至于为什么用管程实现更加安全方便,个人认为老师在视频中并没有解释得很清楚,和老师讨论后得出结论如下:

  • 用信号量的时候(左上角ppt),所有信号量都要自己维护,并配对好PV;
  • 用管程的时候,我们可以理解为BoundedBuffer继承了一个管程类,因此操作系统会给BoundedBuffer中每一个方法自动加上锁(即右上角ppt中的lock->Acquire()lock->Release()函数并不用自己写,是系统加上的),因此更加安全可控,容易查错。

但是个人认为使用条件变量也要根据条件配对好Wait和Signal函数,因此不比使用信号量更容易安全,这个问题见仁见智吧,但如果考试时候问到怎么回答大家懂的~

更新:ucore lab7中实现信号量的sem值非负,这样看来ucore中信号量的sem值和条件变量中的count值应该是完全相等的。

哲学家就餐问题

问题描述:

  • 5个哲学家围绕在一张圆桌周围
  • 桌子上有5根筷子(或者说叉子……随便啦)
  • 哲学家的动作包括思考和进餐
  • 进餐时一个哲学家需要自己两边的两根筷子
  • 如何保证哲学家们的动作有序进行,既不发生饥饿也不发生死锁?
  • 方案1:
    • 每个筷子用一个信号量表示,sem=1
    • 哲学家先拿第一根筷子,再拿第二根筷子,然后吃,最后放回两根筷子
    • 多数情况下这一算法可行;但极端情况下会出现死锁,比如所有哲学家同时拿左边的筷子
  • 方案2:
    • 除了每根筷子的信号量之外,再加上一个互斥信号量,同时只能有一个哲学家就餐
    • 能够保证顺序吃饭,但是浪费了资源和时间
  • 方案3:
    • 和方案1一样,使用5个信号量表示筷子
    • 哲学家根据编号不同,拿取筷子的顺序不同
    • 此时没有死锁,且允许两个人同时就餐
信号量实现

这一实现和ucore lab中给出的使用信号量的实现不太一样,而是参考了OSTEP中的做法(更准确地说是Dijkstra本人的做法)。

// Initialization
Semaphore forks[5];
for (int i = 0; i < 5; i++)
    forks[i] = new Semaphore(1);

// Helper functions
int left(int p) { return p; }
int right(int p) { return (p + 1) % 5; }

// Thread x
void Philosopher(int x) {
    // Switch between thinking and eating
    while (true) {
        think();
        // get forks
        if (p == 4) {  // break dependency
            forks[right(x)].P();
            forks[left(x)].P();
        }
        else {
            forks[left(x)].P();
            forks[right(x)].P();
        }
        eat();
        // put forks
        forks[left(x)].V();
        forks[right(x)].V();
    }
}

读者-写者问题

问题描述:对于一个共享数据,有两类使用者

  • 读者:只读取数据,不修改(可以同时读)
  • 写者:读取和修改数据(不可以同时写)
  • 读写是互斥的

事实上,也需要考虑到,至少有两种可能的策略(而且还会有更多):

  • 读者优先策略:
    • 只要有读者正在读状态,后来的读者就能直接进入
    • 如果读者不断进入,则写者就处于饥饿
  • 写者优先策略
    • 只要有写者就绪,写者应尽快执行写操作(后来的读者需要阻塞)
    • 如果写者持续不断就绪,则读者就处于饥饿
信号量实现

用信号量描述每个约束:

  • 信号量WriteMutex
    • 控制读写操作的互斥
    • 初始化为1
  • 读者计数Rcount
    • 正在进行读操作的读者数目
    • 初始化为0
  • 信号量CountMutex
    • 保护对读者计数的互斥修改
    • 初始化为1

解决方案:

  • 读者的互斥锁只针对于第一个读者,之后不再判断
  • 因为Rcount也需要保护,所以外面也加上互斥锁
void Writer() {
    WriteMutex.P();
    write;
    WriteMutex.V();
}

void Reader() {
    CountMutex.P();
    if (Rcount == 0)
        WriteMutex.P();
    Rcount++;
    CountMutex.V();

    read;

    CountMutex.P();
    Rcount--;
    if (Rcount == 0)
        WriteMutex.V();
    CountMutex.V();
}

在这一实现中,只要有读者开始阅读,就必须等到全部读者都离开才能进行写操作。即使有写操作在等待,读者仍然先于写者,说明这是一种读者优先策略。

管程实现

管程中包括以下内容:

  • 一个互斥锁
  • 4个状态变量
  • 2个条件变量:可读/可写

从判断条件可以看出,这一实现采用的是写者优先策略。不过其实我还没想明白能否把对AWWW变量的修改移动到while循环外面。

class Database {
    Lock lock;  // 管程的互斥锁
    int AR = 0;  // Active Readers
    int AW = 0;  // Active Writers
    int WR = 0;  // Waiting Readers
    int WW = 0;  // Waiting Writers
    Condition okToRead, okToWrite;
}

// 两个基本操作
Database::Read() {
    StartRead();  // Wait until no writers
    read database;
    DoneRead();  // Exit & wake up waiting writers
}

Database::Write() {
    StartWrite();  // Waite until no readers/writers;
    write database;
    DoneWrite();  // Exit & wake up waiting readers/writers
}

Private Database::StartRead() {
    lock.Acquire();
    while (AW + WW > 0) {
        WR++;
        okToRead.wait(&lock);
        WR--;
    }
    AR++;
    lock.Release();
}

Private Database::DoneRead() {
    lock.Acquire();
    AR--;
    if (AR == 0 && WW > 0)
        okToWrite.signal();
    lock.Release();
}

Private Database::StartWrite() {
    lock.Acquire();
    while (AW + AR > 0) {
        WW++;
        okToWrite.wait(&lock);
        WW--;
    }
    AW++;
    lock.Release();
}

Private Database::DoneWrite() {
    lock.Acquire();
    AW--;
    if (WW > 0)
        okToWrite.signal();
    else if (AW > 0)
        okToRead.broadcast();
    lock.Release();
}