文章目录
  1. 信号量
    1. 信号量的使用
    2. 用信号量解决生产者-消费者问题
  2. 管程
    1. 用管程解决生产者-消费者问题
    2. 条件变量释放后的处理
    3. 用管程解决读者-写者问题

信号量

信号量是操作系统提供的一种协调共享资源访问的方法。信号量是一种抽象的数据类型,由一个整型变量与两个原子操作(P操作——申请资源,V操作——释放资源)组成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Semaphore{
int sem;
WaitQuene q;//等待队列q
}

Semaphore::__init__(flag){
sem=flag;
q=NULL;
}

Semaphore::P(){
sem--;
if(sem<0){
q.add(current);//当前进程
block(current);
}
}

Semaphore::V(){
sem++;
if(sem<=0){
next=q.remove();
wakeup(next);
}
}

P、V操作由操作系统实现,执行过程中不会被打断,从而保证其原子性。借助P、V操作的原子性,可以更简单地实现同步互斥。下面介绍信号量的使用。

信号量的使用

实现互斥的伪代码如下:

1
mutex= Semaphore(1);

对于需要互斥访问的临界区代码:

1
2
3
mutex->P(t);
//Critical Section
mutex->V();

注意:必须按照先P操作后V操作的顺序使用,且P操作与V操作必须成对使用。

实现条件同步的伪代码如下:

1
condition=Semaphore(0);

例如:必须等线程B执行完X,线程A才能执行N;这样使用信号量:

对线程A:在N之前插入P操作,对线程B:在X后插入V操作,即:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//线程A
----------------
| M |
----------------
condition->P();
----------------
| N |
----------------
//线程A

//线程B
----------------
| X |
----------------
condition->V();
----------------
| Y |
----------------
//线程B

用信号量解决生产者-消费者问题

问题分析:

  • 任何时刻只能有一个线程操作缓冲区(互斥访问);
  • 缓冲区空则消费者必须等待生产者(条件同步);
  • 缓冲区满则生产者必须等待消费者(条件同步);

每个约束条件对应一个信号量:(二进制)信号量mutex、(资源)信号量fullBuffers与(资源)信号量emptyBuffers。将有界缓冲区定义为如下类,Deposit方法向有界缓冲区写入一个字节,Remove方法从有界缓冲区中移出一个字节。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class BoundedBuffer{
Semaphore mutex;
Semaphore fullBuffers;
Semaphore emptyBuffers;
char data[n];//n为缓冲区大小
}

BounderBuffer::__init__(){
mutex=Semaphore(1);
fullBuffers=Semaphore(0);
emptyBuffers=Semaphore(n);
}

BounderBuffer::Deposit(c){
emptyBuffers->P();
mutex->P();
write(data, c);
mutex->V();
fullBuffers->V();
}

BounderBuffer::Remove(){
fullBuffers->P();
mutex->P();
move(data, c);
mutex->V();
emptyBuffers->V();
}

从上面的运用中可以看出,使用信号量来解决同步互斥问题时要求P、V操作必须成对出现,编写容易出错。一旦没有考虑周全,就会导致死锁。

管程

管程是一种用于多线程互斥访问共享资源的程序结构。它采用面向对象的编程方法,将对共享资源的P、V操作成对地写入对共享资源的操作。与临界区不同的是,正在管程中的线程可以放弃对管程的互斥访问,等待事件出现时恢复(与临界区的不同点:线程在临界区中执行时,只能在临界区执行完后退出临界区)。

利用管程来定义访问数据的方法,从而不用程序编写者考虑访问时的P、V操作。管程包括锁与条件变量(可选):

  • 锁用于控制管程代码的互斥访问,因此在管程外会有一个等待队列;
  • 每个条件变量对应一种等待原因,都有一个等待队列(在管程内部);另外,条件变量有两种操作——Wait操作与Signal操作;对条件变量的定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Condition{
int numWaiting;
WaitQuene q;
}

Condition::__init__(){
numWaiting=0;
q=NULL;
}

Condition::Wait(lock){
numWaiting++;
//自身阻塞在条件变量的等待队列中
q.add(current);
//释放管程的互斥访问
lock.Release();
//去唤醒其他线程
schedule();
//条件满足后重新获取锁
lock.Acquire();
}

Condition::Signal(){
if(numWaiting>0){
t=q.remove();//从q中选择一个进程将其唤醒
wakeup(t);
numWaiting--;
}
}

用管程解决生产者-消费者问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class BoundedBuffer{
Lock lock;
int count;
Condition notFull, notEmpty;
char data[n];//n为缓冲区大小
}

BoundedBuffer::__init__(){
lock=Lock();
count=0;
notFull=Condition();
notEmpty=Condition();
}

BoundedBuffer::Deposit(c){
lock.Acquire();
//检查是否还有空闲区,如果缓存写满了就等待,直到notFull的条件满足。
while(count==n)
notFull.Wait(lock);
write(data, c);
count++;
notEmpty.Signal();
lock.Realease();
}

BoundedBuffer::Remove(c){
lock.Acquire();
//检查缓冲区是否有数据,如果缓冲区为空则等待,直到notEmpty的条件满足。
while(count==0)
notEmpty.Wait(lock);
move(data, c);
count--;
notFull.Signal();
lock->Release();
}

条件变量释放后的处理

根据条件变量释放后处理的不同,管程分为Hansen管程与Hoare管程;Hansen管程会等到当前线程执行完之后再去执行条件变量唤醒的线程,而Hoare管程会立即切换到条件变量唤醒的线程执行。在实际操作系统中,使用Hansen管程因为其效率更高、且易于实现。若操作系统使用了Hansen管程,等到被唤醒的线程被调度时可能所需条件又不满足,因此需要用while循环再次检查;但如果操作系统使用的是Hoare管程。那么CPU会立即执行被唤醒的线程,因此使用if控制流。

用管程解决读者-写者问题

读者-写者问题约束如下:

  • 同一时刻,允许多个读者同时读;
  • 读写互斥;
  • 写写互斥;

定义两个基本方法——Read与Write,4个条件变量;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class Database{
int AR;//active reader
int AW;//active writer
int WR;//waiting reader
int WW;//waiting writer
Lock lock;
Condition okToRead, okToWrite;
char * data;
}

Database::Read(){
while (AW+WW);//wait until no writers.
StartRead();
read(data);
//check out: wake up waiting writers.
DoneRead();
}

Database::StartRead(){
lock.Acquire();
while((AW+WW)>0){
WR++;
okToRead.wait(lock);
WR--;
}
AR++;
lock.Release();
}

Database::DoneRead(){
lock.Acquire();
AR--;
if(AR==0 && WW>0)
okToWrite.Signal();
lock.Release();
}

Database::Write(){
while (AR+WR);//wait until no reader.
StartWrite();
write(data);
//check out: wake up wating reader.
DoneWrite();
}

Database::StartWrite(){
lock.Acquire();
while((AW+AR)){
WW++;
okToWrite.wait(lock);
WW--;
}
AW++;
lock.Release();
}

Database::DoneWrite(){
lock.Acquire();
AW--;
if(WW>0)
okToWrite.Signal();
else if(WR>0)
okToRead.broadcast();//使所有在该条件上等待的进程都被释放并进入队列(读读不互斥)
lock.Release();
}