ucore-lab7

本文最后更新于:2023年6月22日 上午

练习解答

  • 理解操作系统的同步互斥的设计实现;
  • 理解底层支撑技术:禁用中断、定时器、等待队列;
  • 在ucore中理解信号量(semaphore)机制的具体实现;
  • 理解管程机制,在ucore内核中增加基于管程(monitor)的条件变量(condition variable)的支持;
  • 了解经典进程同步问题,并能使用同步机制解决进程同步问题。

练习0

填写实验,自行填写,懒得找了,可以参考kiprey

练习一

理解内核级信号量的实现和基于内核级信号量的哲学家就餐问题(不需要编码)

完成练习0后,建议大家比较一下(可用meld等文件diff比较软件)个人完成的lab6和练习0完成后的刚修改的lab7之间的区别,分析了解lab7采用信号量的执行过程。执行make grade,大部分测试用例应该通过。

请在实验报告中给出内核级信号量的设计描述,并说明其大致执行流程。

请在实验报告中给出给用户态进程/线程提供信号量机制的设计方案,并比较说明给内核级提供信号量机制的异同。

实际上就是解释ucore的哲学家就餐怎么实现的,内核级别的信号量怎么实现的,之后给出自己关于用户级别的信号量的设计方案,比较两者异同。

关于哲学家就餐问题,不知道为什么,代码里面有注释,中文的。。。

结合注释是不难理解的。

在kern/sync/check_sync.c可以找到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
//---------- philosophers problem using semaphore ----------------------
int state_sema[N]; /* 记录每个人状态的数组 */
/* 信号量是一个特殊的整型变量 */
semaphore_t mutex; /* 临界区互斥 */
semaphore_t s[N]; /* 每个哲学家一个信号量 */

struct proc_struct *philosopher_proc_sema[N];

void phi_test_sema(i) /* i:哲学家号码从0到N-1 */
{
if(state_sema[i]==HUNGRY&&state_sema[LEFT]!=EATING
&&state_sema[RIGHT]!=EATING)
{
state_sema[i]=EATING;
up(&s[i]);
}
}

void phi_take_forks_sema(int i) /* i:哲学家号码从0到N-1 */
{
down(&mutex); /* 进入临界区 */
state_sema[i]=HUNGRY; /* 记录下哲学家i饥饿的事实 */
phi_test_sema(i); /* 试图得到两只叉子 */
up(&mutex); /* 离开临界区 */
down(&s[i]); /* 如果得不到叉子就阻塞 */
}

void phi_put_forks_sema(int i) /* i:哲学家号码从0到N-1 */
{
down(&mutex); /* 进入临界区 */
state_sema[i]=THINKING; /* 哲学家进餐结束 */
phi_test_sema(LEFT); /* 看一下左邻居现在是否能进餐 */
phi_test_sema(RIGHT); /* 看一下右邻居现在是否能进餐 */
up(&mutex); /* 离开临界区 */
}

int philosopher_using_semaphore(void * arg) /* i:哲学家号码,从0到N-1 */
{
int i, iter=0;
i=(int)arg;
cprintf("I am No.%d philosopher_sema\n",i);
while(iter++<TIMES)
{ /* 无限循环 */
cprintf("Iter %d, No.%d philosopher_sema is thinking\n",iter,i); /* 哲学家正在思考 */
do_sleep(SLEEP_TIME);
phi_take_forks_sema(i);
/* 需要两只叉子,或者阻塞 */
cprintf("Iter %d, No.%d philosopher_sema is eating\n",iter,i); /* 进餐 */
do_sleep(SLEEP_TIME);
phi_put_forks_sema(i);
/* 把两把叉子同时放回桌子 */
}
cprintf("No.%d philosopher_sema quit\n",i);
return 0;
}
  • 请给出内核级信号量的设计描述,并说明其大致执行流程

内核级别信号量结构体定义位于kern/sync/sem.h中:

1
2
3
4
typedef struct {
int value;
wait_queue_t wait_queue;
} semaphore_t;

观察哲学家就餐代码,不难发现信号量有关函数:down,up,继续查看封装的函数在kern/sync/sem.c里面,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
void
up(semaphore_t *sem) {
__up(sem, WT_KSEM);
}

void
down(semaphore_t *sem) {
uint32_t flags = __down(sem, WT_KSEM);
assert(flags == 0);
}
////////////////////////////////////////////////////////////////
static __noinline void __up(semaphore_t *sem, uint32_t wait_state) {
bool intr_flag;
//根据intr_flag决定是否要enable irq interrupt
local_intr_save(intr_flag);
{
wait_t *wait;
//如果队列中没有wait的线程,value++
if ((wait = wait_queue_first(&(sem->wait_queue))) == NULL) {
sem->value ++;
}
//否则唤醒线程且执行代码
else {
assert(wait->proc->wait_state == wait_state);
wakeup_wait(&(sem->wait_queue), wait, wait_state, 1);
}
}
local_intr_restore(intr_flag);//也是封装的函数
}

static __noinline uint32_t __down(semaphore_t *sem, uint32_t wait_state) {
bool intr_flag;
local_intr_save(intr_flag);
if (sem->value > 0) {
//如果信号量>0,递减
sem->value --;
local_intr_restore(intr_flag);
return 0;
}
//如果等于0,则准备执行进程,添加至等待队列
wait_t __wait, *wait = &__wait;
wait_current_set(&(sem->wait_queue), wait, wait_state);
local_intr_restore(intr_flag);
//调度算法
schedule();
//执行完就删除
local_intr_save(intr_flag);
wait_current_del(&(sem->wait_queue), wait);
local_intr_restore(intr_flag);

if (wait->wakeup_flags != wait_state) {
return wait->wakeup_flags;
}
return 0;
}
//local_intr_restore,如果传入的flag为true,则enable irq interrupt,继续跟踪intr_enable会发现一段汇编
static inline void
__intr_restore(bool flag) {
if (flag) {
intr_enable();
}
}
  • 请给出给用户态进程/线程提供信号量机制的设计方案,并比较说明给内核级提供信号量机制的异同

首先,用户级别的信号量应该存储在用户空间中,但用户本身能不能操控信号量?对于一个进程的多个线程来讲,似乎可以交由进程进行信号量的管理,但对于多个进程公用的信号量来讲,我认为应该调用内核,由内核进行管理。信号量由使用信号量的代码的更高一级的代码进行管理,应该是比较好的,至少应该抽象出更高的一个层级去管理。但考虑到信号量涉及到的同步问题,完全有内核进行原子性的操作会更好一点。

那么,怎么云实现呢?可以在proc的结构体里面增加信号量的相关代码,用于获取信号量的值,发出增加或减少信号量的请求,再由操作系统实现。详细可以参考kiprey,他参考了linux的实现。

练习二

练习2: 完成内核级条件变量和基于内核级条件变量的哲学家就餐问题(需要编码)

首先掌握管程机制,然后基于信号量实现完成条件变量实现,然后用管程机制实现哲学家就餐问题的解决方案(基于条件变量)。

执行:make grade 。如果所显示的应用程序检测都输出ok,则基本正确。如果只是某程序过不去,比如matrix.c,则可执行

1
make run-matrix

命令来单独调试它。大致执行结果可看附录。

请在实验报告中给出内核级条件变量的设计描述,并说明其大致执行流程。

请在实验报告中给出给用户态进程/线程提供条件变量机制的设计方案,并比较说明给内核级提供条件变量机制的异同。

请在实验报告中回答:能否不用基于信号量机制来完成条件变量?如果不能,请给出理由,如果能,请给出设计说明和具体实现。

在kern/sync/monitor.h中,有相关结构体,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct monitor monitor_t;

typedef struct condvar{
semaphore_t sem; // the sem semaphore is used to down the waiting proc, and the signaling proc should up the waiting proc
int count; // the number of waiters on condvar
monitor_t * owner; // the owner(monitor) of this condvar
} condvar_t;

typedef struct monitor{
semaphore_t mutex; // the mutex lock for going into the routines in monitor, should be initialized to 1
semaphore_t next; // the next semaphore is used to down the signaling proc itself, and the other OR wakeuped waiting proc should wake up the sleeped signaling proc.
int next_count; // the number of of sleeped signaling proc
condvar_t *cv; // the condvars in monitor
} monitor_t;

此处不再赘述,

在kern/sync/monitor.c,有monitor_init的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void     
monitor_init (monitor_t * mtp, size_t num_cv) {
int i;
assert(num_cv>0);
mtp->next_count = 0;
mtp->cv = NULL;

//初始化锁为0,next为1
sem_init(&(mtp->mutex), 1); //unlocked
sem_init(&(mtp->next), 0);
//分配当前管程的条件变量
mtp->cv =(condvar_t *) kmalloc(sizeof(condvar_t)*num_cv);
assert(mtp->cv!=NULL);
//初始化管程
for(i=0; i<num_cv; i++){
mtp->cv[i].count=0;
sem_init(&(mtp->cv[i].sem),0);
mtp->cv[i].owner=mtp;
}
}

我们要实现的是接下来两个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// Unlock one of threads waiting on the condition variable. 
//看注释知道要做什么
void
cond_signal (condvar_t *cvp) {
//LAB7 EXERCISE1: YOUR CODE
cprintf("cond_signal begin: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count);
//打印提示信息
//如果管程内等待的线程大于0个
if(cvp->count>0) {
//正在睡眠的进程++
cvp->owner->next_count ++;
//尝试唤醒线程,更新信号量
up(&(cvp->sem));
//正在执行的线程被挂起
down(&(cvp->owner->next));
//睡眠线程就--
cvp->owner->next_count --;
}
//打印提示信息
cprintf("cond_signal end: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count);
}

// Suspend calling thread on a condition variable waiting for condition Atomically unlocks
// mutex and suspends calling thread on conditional variable after waking up locks mutex. Notice: mp is mutex semaphore for monitor's procedures
void
cond_wait (condvar_t *cvp) {
//LAB7 EXERCISE1: YOUR CODE
cprintf("cond_wait begin: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count);
//打印信息
//需要等待的线程++
cvp->count++;
//如果有挂起的线程就线执行该线程
if(cvp->owner->next_count > 0)
up(&(cvp->owner->next));
else
//否则释放锁
up(&(cvp->owner->mutex));
// 尝试获取条件变量
down(&(cvp->sem));
//需要等待的线程--
cvp->count --;
cprintf("cond_wait end: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count);
}

哲学家就餐问题基本和信号量的实现相同,此处不再赘述。

关于 能否不用基于信号量机制来完成条件变量?如果不能,请给出理由,如果能,请给出设计说明和具体实现。 ,个人认为,条件变量实质上可以看作信号量的简陋版本。

如果分数低,可以按照网上的办法偷懒改grade.sh,也可以参考我之前写的办法。

结果:

challenge1

扩展练习 Challenge : 在ucore中实现简化的死锁和重入探测机制

在ucore下实现一种探测机制,能够在多进程/线程运行同步互斥问题时,动态判断当前系统是否出现了死锁产生的必要条件,是否产生了多个进程进入临界区的情况。 如果发现,让系统进入monitor状态,打印出你的探测信息。

challengen2

扩展练习 Challenge : 参考Linux的RCU机制,在ucore中实现简化的RCU机制

在ucore 下实现下Linux的RCU同步互斥机制。可阅读相关Linux内核书籍或查询网上资料,可了解RCU的设计实现细节,然后简化实现在ucore中。 要求有实验报告说明你的设计思路,并提供测试用例。下面是一些参考资料:

  • http://www.ibm.com/developerworks/cn/linux/l-rcu/
  • http://www.diybl.com/course/6_system/linux/Linuxjs/20081117/151814.html

仍是源自github

所谓RCU,实际上适用于多个读者,少的写者的情况。他甚至可以不对读者加锁,而只对写者加锁。这样满足了读者可以随时进行读取操作,减少开销,而写者则是正常的加锁策略。由此,需要解决的问题是,我在写共享资源的时候,有一个读者过来读,我怎么保证他读的对?写的进程会copy资源,并且移动资源到新的位置,在写的过程中,读者进程读取原先的位置,此时会报错。rcu通过“销毁不删除”来实现。即写进程写时默认删除原值,读者在写进程执行时读取则可以正常读到原先的值(此时不销毁),写进程结束后销毁原值,更改共享资源地址即可。

具体思路可以参考上述的github.

最终效果如下,由于没有实现相应的哲学家就餐问题,make grade只有183,不过这不重要:

由于只是简化实现,因此并没有对写者加锁的代码。代码解释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102

typedef struct foo {
int a;
char b;
} foo_t;

foo_t* gbl_foo = NULL;

foo_t* old_foo_ref = NULL;
foo_t* new_foo_ref = NULL;
int grace_period_count = 0;

semaphore_t foo_sem;
//初始化

//信号量
static void rcu_read_lock(foo_t* ref) {
bool intr_flag;
local_intr_save(intr_flag);
{
if (ref == old_foo_ref) {
grace_period_count += 1;
}
}
}
//信号量
static void rcu_read_unlock(foo_t* ref) {
bool intr_flag;
local_intr_save(intr_flag);
{
if (ref == old_foo_ref) {
grace_period_count -= 1;
}
}
}
//重置
static int resync_rcu_trail() {
return (grace_period_count != 0);
}
//读,老进程就增加"宽限区"
static void foo_read(int id) {
cprintf("Foo_read %d starts.\n", id);
rcu_read_lock(gbl_foo);
// 读取旧值的指针
foo_t* fp = gbl_foo;
// If fp == NULL, it means gbl_foo has been deleted (don't care whether it is destroyed)
if (fp != NULL) {
// Sleep for some time.
//这里我不是很清楚为什么要sleep
do_sleep(2);
cprintf("[SAFE] foo_read: gbl_foo.a = %d, gbl_foo.b = %c\n", fp->a, fp->b);
} else {
panic("[DANGER] foo_read: attempt to read foo when foo is null.");
}
rcu_read_unlock(fp);
cprintf("Foo_read %d ends.\n", id);
}
//更新共享资源的位置,写
// Update the gbl_foo to new_fp and free the old_fp.
// However, the free process could happen when Line36 is running.
// Thus, we need to do the update but delay the destroy of old_foo.
// Until all foo_reads exits the critical area.
static void foo_update(int id) {
cprintf("Foo_update %d starts.\n", id);
// foo_sem is a mutex for gbl_foo
down(&(foo_sem));
foo_t* old_fp = gbl_foo;
gbl_foo = new_foo_ref;
up(&(foo_sem));
cprintf("Foo_update waiting for %d graceful period to finish.\n", grace_period_count);
// spin when process left in grace period
while (resync_rcu_trail()) schedule();
kfree(old_fp);
cprintf("Foo_update %d ends.\n", id);
}
//测试rcu
void check_rcu() {
sem_init(&(foo_sem), 1);
old_foo_ref = (foo_t*) kmalloc(sizeof(foo_t));
old_foo_ref->a = 5;
old_foo_ref->b = 'O';
new_foo_ref = (foo_t*) kmalloc(sizeof(foo_t));
new_foo_ref->a = 6;
new_foo_ref->b = 'N';

gbl_foo = old_foo_ref;

int r1k = kernel_thread(foo_read, (void *)1, 0);
int r2k = kernel_thread(foo_read, (void *)2, 0);
int w1k = kernel_thread(foo_update, (void *)1, 0);
int r3k = kernel_thread(foo_read, (void *)3, 0);
int r4k = kernel_thread(foo_read, (void *)4, 0);

do_wait(r1k, NULL);
do_wait(r2k, NULL);
do_wait(w1k, NULL);
do_wait(r3k, NULL);
do_wait(r4k, NULL);

cprintf("check_rcu() passed\n");
}


本文作者: Heeler-Deer
本文链接: https://heeler-deer.top/posts/27963/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!