Lab7:同步互斥

并发进程的正确性

  • 独立进程
  1. 不和其他进程共享资源或状态
  2. 确定性 -> 输入状态决定结果
  3. 可重现 -> 能够重现起始条件
  4. 调度顺序不重要
  • 并发进程
  1. 在多个进程间有资源共享
  2. 不确定性
  3. 不可重现
  • 并发进程的正确性

  • 执行过程是不确定性和不可重现的

  • 程序错误可能是间歇性发生的

并发的好处

  • 共享资源
  • 加速
  • 模块化

同步问题

时间 A B
3:00 查看冰箱,没有面包
3:05 离开家去商店
3:10 到达商店 查看冰箱,没有面包了
3:15 购买面包 离开家去商店
3:20 到家,把面包放进冰箱 到达商店
3:25 购买面包
3:30 到家,把面包放进冰箱

解决

利用两个原子操作实现一个锁(lock)
Lock.Acquire()
在锁被释放前一直等待,然后获得锁
如果两个线程都在等待同一个锁,并且同时发现锁被释放了,那么只有一个能够获得锁

1
2
3
4
5
breadlock.Acquire();    //进入临界区
if (nobread) {
buy bread; //临界区
}
breadlock.Release(); //退出临界区

进程的交互关系:相互感知程度

相互感知的程度 交互关系 进程间的影响
相互不感知(完全不了解其它进程的存在) 独立 一个进程的操作对其他进程的结果无影响
间接感知(双方都与第三方交互,如共享资源) 通过共享进行协作 一个进程的结果依赖于共享资源的状态
直接感知(双方直接交互,如通信) 通过通信进行协作 一个进程的结果依赖于从其他进程获得的信息

临界区(Critical Section)

  • 进程中访问临界资源的一段需要互斥执行的代码
  • 检查可否进入临界区的一段代码
  • 如可进入,设置相应”正在访问临界区”标志
  • 如可进入,设置相应”正在访问临界区”标志

实现方法

  • 禁用中断

进入临界区,禁止所有中断,并保存标志,离开临界区,使能所有中断,并恢复标志,没有中断,没有上下文切换,因此没有并发

但是禁用中断后,进程无法被停止,整个系统都会为此停下来,可能导致其他进程处于饥饿状态

  • 基于软件的同步解决方法

Peterson算法实现

1
2
3
4
5
6
7
8
9
10
11
12
do {
flag[i] = true;
turn = j;
while ( flag[j] && turn == j);

CRITICAL SECTION

flag[i] = false;

REMAINDER SECTION

} while (true);
  • 更高级的抽象方法
  1. 锁(lock)

锁是一个抽象的数据结构,一个二进制变量(锁定/解锁),Lock::Acquire(),锁被释放前一直等待,然后得到锁,释放锁,唤醒任何等待的进程

  1. 原子操作指令

测试和置位(Test-and-Set )指令,从内存单元中读取值,测试该值是否为1(然后返回真或假),内存单元值设置为1

1
2
3
4
5
6
boolean TestAndSet (boolean *target)‏
{
boolean rv = *target;
*target = true;
return rv:
}

交换指令(exchange)

1
2
3
4
5
6
void Exchange (boolean *a, boolean *b)‏
{
boolean temp = *a;
*a = *b;
*b = temp:
}

使用TS指令实现自旋锁(spinlock)

1
2
3
4
Lock::Acquire() {
while (test-and-set(value))
; //spin
}

信号量

信号量是操作系统提供的一种协调共享资源访问的方法,软件同步是平等线程间的一种同步协商机制,OS是管理者,地位高于进程,用信号量表示系统资源的数量

信号量是一种抽象数据类型

由一个整形 (sem)变量和两个原子操作组成

P() (Prolaag (荷兰语尝试减少))

  • sem减1
  • 如sem<0, 进入等待, 否则继续

V() (Verhoog (荷兰语增加))

  • sem加1
  • 如sem≤0,唤醒一个等待进程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
classSemaphore {
int sem;
WaitQueue q;
}

Semaphore::P() {
sem--;
if (sem < 0) {
Add this thread t to q;
block(p);
}
}

Semaphore::V() {
sem++;
if (sem<=0) {
Remove a thread t from q;
wakeup(t);
}
}

用信号量实现临界区的互斥访问

必须成对使用P()操作和V()操作

1
2
3
4
5
mutex = new Semaphore(1);

mutex->P();
Critical Section;
mutex->V();

用信号量实现条件同步

1
2
3
4
5
6
7
condition = new Semaphore(0);

A:
condition->P();

B:
condition->V();

生产者-消费者问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Class BoundedBuffer {
mutex = new Semaphore(1);
fullBuffers = new Semaphore(0);
emptyBuffers = new Semaphore(n);
}

BoundedBuffer::Deposit(c) {
emptyBuffers->P();
mutex->P();
Add c to the buffer;
mutex->V();
fullBuffers->V();
}

BoundedBuffer::Remove(c) {
fullBuffers->P();
mutex->P();
Remove c from buffer;
mutex->V();
emptyBuffers->V();
}

管程(Moniter)

管程是一种用于多线程互斥访问共享资源的程序结构,采用面向对象方法,简化了线程间的同步控制,任一时刻最多只有一个线程执行管程代码,正在管程中的线程可临时放弃管程的互斥访问,等待事件出现时恢复

组成

  • 一个锁

    控制管程代码的互斥访问

  • 0或者多个条件变量

    管理共享数据的并发访问

条件变量

  • 条件变量是管程内的等待机制

    进入管程的线程因资源被占用而进入等待状态,每个条件变量表示一种等待原因,对应一个等待队列

  • Wait()操作

    将自己阻塞在等待队列中,唤醒一个等待者或释放管程的互斥访问

  • Signal()操作

    将等待队列中的一个线程唤醒,如果等待队列为空,则等同空操作

生产者和消费者问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
classBoundedBuffer {

Lock lock;
int count = 0;
Condition notFull, notEmpty;
}

BoundedBuffer::Deposit(c) {
lock->Acquire();
while (count == n)
notFull.Wait(&lock);
Add c to the buffer;
count++;
notEmpty.Signal();
lock->Release();
}

BoundedBuffer::Remove(c) {
lock->Acquire();
while (count == 0)
notEmpty.Wait(&lock);
Remove c from buffer;
count--;
notFull.Signal();
lock->Release();
}

管程实现哲学家就餐问题

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct monitor{
semaphore_t mutex; // 二值信号量,只允许一个进程进入管程,初始化为1
semaphore_t next; //配合cv,用于进程同步操作的信号量
int next_count; // 睡眠的进程数量
condvar_t *cv; // 条件变量cv
} monitor_t;

typedef struct condvar{
semaphore_t sem; //用于发出wait_cv操作的等待某个条件C为真的进程睡眠
int count; // 在这个条件变量上的睡眠进程的个数
monitor_t * owner; // 此条件变量的宿主管程
} condvar_t;

管程中的条件变量cv通过执行wait_cv,会使得等待某个条件C为真的进程能够离开管程并睡眠,且让其他进程进入管程继续执行;而进入管程的某进程设置条件C为真并执行signal_cv时,能够让等待某个条件C为真的睡眠进程被唤醒,从而继续进入管程中执行。发出signal_cv的进程A会唤醒睡眠进程B,进程B执行会导致进程A睡眠,直到进程B离开管程,进程A才能继续执行,这个同步过程是通过信号量next完成的;而next_count表示了由于发出singal_cv而睡眠的进程个数。

1
2
3
4
5
6
7
8
9
10
11
void
cond_signal (condvar_t *cvp) {
cprintf("cond_signal begin: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count);
if(cvp->count>0) { //当前存在睡眠的进程
cvp->owner->next_count ++;//睡眠的进程总数加一
up(&(cvp->sem));//唤醒等待在cv.sem上睡眠的进程
down(&(cvp->owner->next));//把自己睡眠
cvp->owner->next_count --;//睡醒后等待此条件的睡眠进程个数减一
}
cprintf("cond_signal end: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count);
}

将指定条件变量上等待队列中的一个线程进行唤醒,并且将控制权转交给这个进程

1
2
3
4
5
6
7
8
9
10
11
12
13
void
cond_wait (condvar_t *cvp) {
//LAB7 EXERCISE1: YOUR CODE
cprintf("cond_wait begin: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count);
cvp->count++;//需要睡眠的进程个数加一
if(cvp->owner->next_count > 0)
up(&(cvp->owner->next));//唤醒进程链表中的下一个进程
else
up(&(cvp->owner->mutex));//否则唤醒睡在monitor.mutex上的进程
down(&(cvp->sem));//将自己睡眠
cvp->count --;//睡醒后等待此条件的睡眠进程个数减一
cprintf("cond_wait end: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count);
}

函数的功能为将当前进程等待在指定信号量上,其操作过程为将等待队列的计数加1,然后释放管程的锁或者唤醒一个next上的进程来释放锁,然后把自己等在条件变量的等待队列上,直到有signal信号将其唤醒,正常退出函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void phi_take_forks_condvar(int i) {
down(&(mtp->mutex)); //通过P操作进入临界区
state_condvar[i]=HUNGRY; //记录下哲学家i是否饥饿,即处于等待状态拿叉子
phi_test_condvar(i);
while (state_condvar[i] != EATING) {
cprintf("phi_take_forks_condvar: %d didn't get fork and will wait\n",i);
cond_wait(&mtp->cv[i]);//如果得不到叉子就睡眠
}
//如果存在睡眠的进程则那么将之唤醒
if(mtp->next_count>0)
up(&(mtp->next));
else
up(&(mtp->mutex));
}

void phi_put_forks_condvar(int i) {
down(&(mtp->mutex));//通过P操作进入临界区

state_condvar[i]=THINKING;//记录进餐结束的状态
phi_test_condvar(LEFT);//看一下左边哲学家现在是否能进餐
phi_test_condvar(RIGHT);//看一下右边哲学家现在是否能进餐
//如果有哲学家睡眠就予以唤醒
if(mtp->next_count>0)
up(&(mtp->next));
else
up(&(mtp->mutex));
}

最后的实现