信号量
信号量是操作系统提供的一种协调共享资源访问的方法。信号量是一种抽象的数据类型,由一个整型变量与两个原子操作(P操作——申请资源,V操作——释放资源)组成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| class Semaphore{ int sem; WaitQuene q; }
Semaphore::__init__(flag){ sem=flag; q=NULL; }
Semaphore::P(){ sem--; if(sem<0){ q.add(current); block(current); } }
Semaphore::V(){ sem++; if(sem<=0){ next=q.remove(); wakeup(next); } }
|
P、V操作由操作系统实现,执行过程中不会被打断,从而保证其原子性。借助P、V操作的原子性,可以更简单地实现同步互斥。下面介绍信号量的使用。
信号量的使用
实现互斥的伪代码如下:
对于需要互斥访问的临界区代码:
1 2 3
| mutex->P(t);
mutex->V();
|
注意:必须按照先P操作后V操作的顺序使用,且P操作与V操作必须成对使用。
实现条件同步的伪代码如下:
例如:必须等线程B执行完X,线程A才能执行N;这样使用信号量:
对线程A:在N之前插入P操作,对线程B:在X后插入V操作,即:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| ---------------- | M | ---------------- condition->P(); ---------------- | N | ----------------
---------------- | X | ---------------- condition->V(); ---------------- | Y | ----------------
|
用信号量解决生产者-消费者问题
问题分析:
- 任何时刻只能有一个线程操作缓冲区(互斥访问);
- 缓冲区空则消费者必须等待生产者(条件同步);
- 缓冲区满则生产者必须等待消费者(条件同步);
每个约束条件对应一个信号量:(二进制)信号量mutex、(资源)信号量fullBuffers与(资源)信号量emptyBuffers。将有界缓冲区定义为如下类,Deposit方法向有界缓冲区写入一个字节,Remove方法从有界缓冲区中移出一个字节。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| class BoundedBuffer{ Semaphore mutex; Semaphore fullBuffers; Semaphore emptyBuffers; char data[n]; }
BounderBuffer::__init__(){ mutex=Semaphore(1); fullBuffers=Semaphore(0); emptyBuffers=Semaphore(n); }
BounderBuffer::Deposit(c){ emptyBuffers->P(); mutex->P(); write(data, c); mutex->V(); fullBuffers->V(); }
BounderBuffer::Remove(){ fullBuffers->P(); mutex->P(); move(data, c); mutex->V(); emptyBuffers->V(); }
|
从上面的运用中可以看出,使用信号量来解决同步互斥问题时要求P、V操作必须成对出现,编写容易出错。一旦没有考虑周全,就会导致死锁。
管程
管程是一种用于多线程互斥访问共享资源的程序结构。它采用面向对象的编程方法,将对共享资源的P、V操作成对地写入对共享资源的操作。与临界区不同的是,正在管程中的线程可以放弃对管程的互斥访问,等待事件出现时恢复(与临界区的不同点:线程在临界区中执行时,只能在临界区执行完后退出临界区)。
利用管程来定义访问数据的方法,从而不用程序编写者考虑访问时的P、V操作。管程包括锁与条件变量(可选):
- 锁用于控制管程代码的互斥访问,因此在管程外会有一个等待队列;
- 每个条件变量对应一种等待原因,都有一个等待队列(在管程内部);另外,条件变量有两种操作——Wait操作与Signal操作;对条件变量的定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| class Condition{ int numWaiting; WaitQuene q; }
Condition::__init__(){ numWaiting=0; q=NULL; }
Condition::Wait(lock){ numWaiting++; q.add(current); lock.Release(); schedule(); lock.Acquire(); }
Condition::Signal(){ if(numWaiting>0){ t=q.remove(); wakeup(t); numWaiting--; } }
|
用管程解决生产者-消费者问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| class BoundedBuffer{ Lock lock; int count; Condition notFull, notEmpty; char data[n]; }
BoundedBuffer::__init__(){ lock=Lock(); count=0; notFull=Condition(); notEmpty=Condition(); }
BoundedBuffer::Deposit(c){ lock.Acquire(); while(count==n) notFull.Wait(lock); write(data, c); count++; notEmpty.Signal(); lock.Realease(); }
BoundedBuffer::Remove(c){ lock.Acquire(); while(count==0) notEmpty.Wait(lock); move(data, c); count--; notFull.Signal(); lock->Release(); }
|
条件变量释放后的处理
根据条件变量释放后处理的不同,管程分为Hansen管程与Hoare管程;Hansen管程会等到当前线程执行完之后再去执行条件变量唤醒的线程,而Hoare管程会立即切换到条件变量唤醒的线程执行。在实际操作系统中,使用Hansen管程因为其效率更高、且易于实现。若操作系统使用了Hansen管程,等到被唤醒的线程被调度时可能所需条件又不满足,因此需要用while循环再次检查;但如果操作系统使用的是Hoare管程。那么CPU会立即执行被唤醒的线程,因此使用if控制流。
用管程解决读者-写者问题
读者-写者问题约束如下:
- 同一时刻,允许多个读者同时读;
- 读写互斥;
- 写写互斥;
定义两个基本方法——Read与Write,4个条件变量;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| class Database{ int AR; int AW; int WR; int WW; Lock lock; Condition okToRead, okToWrite; char * data; }
Database::Read(){ while (AW+WW); StartRead(); read(data); DoneRead(); }
Database::StartRead(){ lock.Acquire(); while((AW+WW)>0){ WR++; okToRead.wait(lock); WR--; } AR++; lock.Release(); }
Database::DoneRead(){ lock.Acquire(); AR--; if(AR==0 && WW>0) okToWrite.Signal(); lock.Release(); }
Database::Write(){ while (AR+WR); StartWrite(); write(data); DoneWrite(); }
Database::StartWrite(){ lock.Acquire(); while((AW+AR)){ WW++; okToWrite.wait(lock); WW--; } AW++; lock.Release(); }
Database::DoneWrite(){ lock.Acquire(); AW--; if(WW>0) okToWrite.Signal(); else if(WR>0) okToRead.broadcast(); lock.Release(); }
|