导航菜单

linux内核级同步机制--futex

原文:Multithreading/89.html

在关于多线程同步的采访中,您必须考虑问题。在本文中,我们知道glibc的底层pthread_cond_timedwait是由linux futex机制实现的。

理想的同步机制应该是在没有锁定冲突时通过在用户状态下使用原子指令来解决问题,并在等待等待时使用内核提供的系统调用来休眠和唤醒。换句话说,当用户状态的旋转失败时,进程是否会挂起,并且当持有锁的线程释放锁时会唤醒?如果您没有更深入地考虑这个问题,可以将其视为理所当然(伪代码):

Void lock(int lockval){

//trylock是用户级自旋锁

而(!的tryLock(lockval)){

Wait(); //释放cpu并将当前线程添加到等待队列,这是系统调用

}

}

布尔trylock(int lockval){

Int i=0;

//localval=1表示锁定成功

而(!compareAndSet(lockval,0,1)){

如果(++我→10){

返回false;

}

}

返回true;

}

Void unlock(int lockval){

compareAndSet(lockval,1,0);

通知();

}

上面代码的问题是trylock和wait调用之间有一个窗口:如果线程trylock失败,持有锁的线程在调用wait时释放锁,当前线程仍调用wait等待,但没有人会跟随。再次唤醒线程。

为了解决上述问题,Linux内核引入了futex机制。 Futex主要包括两种等待和唤醒方法:futex_wait和futex_wake,定义如下:

//uaddr指向一个地址,val表示该地址的预期值,并在* uaddr==val时等待(p)

Int futex_wait(int * uaddr,int val);

//唤醒uaddr指向的锁变量上的n个挂起进程

Int futex_wake(int * uaddr,int n);

在实际挂起进程之前,Futex会检查addr指向的地址的值是否等于val。如果它不相等,它将立即返回,并且trylock从用户状态继续。否则,当前线程将插入队列并挂起。

在同步的思想中 - 在关于futex的背景和基本原理的文章中,不熟悉futex的人可以先看看它。

本文将深入分析futex的实现,让读者直观地了解锁的底层实现,然后结合前两篇文章(稍微考虑同步 - 关于同步的思考 - 下)可以用于操作系统同步机制有一个全面的了解。

下文中的进程一词包括常规进程与线程如何确保暂停进程时其他进程未修改val的值?

代码在kernel/futex.c中

静态int futex_wait(u32 __user * uaddr,int fshared,

U32 val,ktime_t * abs_time,u32 bitset,int clockrt)

{

struct hrtimer_sleeper timeout,* to=NULL;

struct restart_block * restart;

结构futex_hash_bucket * hb;

结构futex_q q;

Int ret;

.

//设置hrtimer计时任务:经过一段时间(abs_time)后,如果进程没有被唤醒,则唤醒等待进程

如果(abs_time){

.

Hrtimer_init_sleeper(to,current);

.

}

重试:

//此函数确定uaddr指向的值是否等于val,以及一些初始化操作

Ret=futex_wait_setup(uaddr,val,fshared,& q,& hb);

//如果val改变,直接返回

如果(ret)

转出;

//将当前进程状态更改为TASK_INTERRUPTIBLE,将其插入到futex等待队列中,然后重新计划。

Futex_wait_queue_me(hb,& q,to);

/*如果我们被唤醒(并且没有排队),我成功了,无论如何。 */

Ret=0;

//如果unqueue_me成功,则表示超时被触发(因为futex_wake唤醒,它会将进程移出等待队列,所以它会在这里失败)

如果(!unqueue_me(& q))

转到out_put_key;

Ret=-ETIMEDOUT;

如果(到&&to->任务)

转到out_put_key;

/*

*我们期望signal_pending(当前),但我们可能是

*也是虚假唤醒的受害者。

*/

如果(!signal_pending(current)){

Put_futex_key(fshared,& q.key);

转到重试;

}

Ret=-ERESTARTSYS;

如果(!abs_time)

转到out_put_key;

.

Out_put_key:

Put_futex_key(fshared,& q.key);

输出:

如果(到){

//取消计划任务

Hrtimer_cancel(安培; TO->定时器);

Destroy_hrtimer_on_stack(安培; TO->定时器);

}

返回ret;

}

在阻止进程之前,当前进程将插入到等待队列中。应该注意的是,这里提到的等待队列实际上是类似Java HashMap的结构,它是全局唯一的。

结构futex_hash_bucket {

Spinlock_t锁;

//双链表

struct plist_head链;

};

静态结构futex_hash_bucket futex_queues [1<

专注于futex_wait_setup和两个函数futex_wait_queue_me

静态int futex_wait_setup(u32 __user * uaddr,u32 val,int fshared,

结构futex_q * q,struct futex_hash_bucket ** hb)

{

U32 uval;

Int ret;

重试:

Q-> key=FUTEX_KEY_INIT;

//初始化futex_q

Ret=get_futex_key(uaddr,fshared,& q-> key,VERIFY_READ);

如果(不太可能(ret!=0))

返回ret;

Retry_private:

//获得自旋锁

* hb=queue_lock(q);

//Atom将uaddr的值设置为uval

Ret=get_futex_value_locked(& uval,uaddr);

.

//如果当前时间段内uaddr指向的值不等于val,则表示其他进程已被修改

该作品不再有效,可以直接退回而不会阻止。

如果(uval!=val){

//释放锁定

Queue_unlock(q,* hb);

Ret=-EWOULDBLOCK;

}

.

返回ret;

}

函数futex_wait_setup主要做两件事,一个是获取自旋锁,另一个是确定* uaddr是否是期望值。

静态void futex_wait_queue_me(struct futex_hash_bucket * hb,struct futex_q * q,

Struct hrtimer_sleeper * timeout)

{

//将进程状态设置为TASK_INTERRUPTIBLE,cpu只会在调度时选择

//使用州TASK_RUNNING进行处理

Set_current_state(TASK_INTERRUPTIBLE);

//将当前进程(q包)插入等待队列,然后释放自旋锁

Queue_me(q,hb);

//启动计划任务

如果(超时){

Hrtimer_start_expires(& timeout-> timer,HRTIMER_MODE_ABS);

如果(!hrtimer_active(& timeout-> timer))

超时 - > task=NULL;

}

/*

*如果我们已从哈希列表中删除,那么另一个任务

*试图唤醒我们,我们可以跳过调用schedule()。

*/

如果(可能是(!plist_node_empty(& q-> list))){

//如果未设置到期时间||到期时间已设置且尚未过期

如果(!timeout || timeout-> task)

//系统重新调度进程,这次cpu会去执行其他进程,这个进程会在这里阻塞

附表();

}

//到这里说明它已被cpu选中

作品之间的原子性和等待

在futex_wait_setup方法中,添加了一个自旋锁;在futex_wait_queue_me中,状态设置为TASK_INTERRUPTIBLE,并调用queue_me将当前线程插入等待队列,然后释放自旋锁。也就是说,检查uaddr值的过程与进程挂起的过程放在同一个关键部分。当释放自旋锁时,此时更改addr地址的值是无关紧要的,因为当前进程已被添加到等待队列中,并且可以通过唤醒唤醒,而没有任何人醒来的问题在本文开头提到了。

Futex_wait摘要

总结futex_wait进程:

添加自旋锁检测* uaddr等于val,如果不相等,则会立即返回将进程状态设置为TASK_INTERRUPTIBLE,将当前进程插入等待队列以释放自旋锁。创建一个定时任务:当它在一段时间内没有被唤醒时,唤醒该过程并暂停当前进程

Futex_wake

静态int futex_wake(u32 __user * uaddr,int fshared,int nr_wake,u32 bitset)

{

结构futex_hash_bucket * hb;

结构futex_q * this,* next;

结构plist_head * head;

Union futex_key key=FUTEX_KEY_INIT;

Int ret;

.

//根据uaddr

的值填写&键的内容

Ret=get_futex_key(uaddr,fshared,& key,VERIFY_READ);

如果(不太可能(ret!=0))

转出;

//根据& key获取uaddr所在的futex_hash_bucket

Hb=hash_futex(& key);

//为hb添加自旋锁

用spin_lock(安培; HB->锁);

Head=& hb-> chain;

//遍历hb链表,注意链表中存储的节点是plist_node类型,这里是futex_q类型,这种类型转换是通过c

中的container_of机制实现的。

Plist_for_each_entry_safe(this,next,head,list){

如果(match_futex(& this-> key,& key)){

.

//唤醒相应的流程

Wake_futex(本);

如果(++ ret>=nr_wake)

打破;

}

}

//释放自旋锁

Spin_unlock(安培; HB->锁);

Put_futex_key(fshared,& key);

输出:

返回ret;

}

futex_wake过程如下:

找到对应uaddr的futex_hash_bucket,也就是说,代码中的hb为hb添加一个自旋锁来遍历fb的链表,找到对应uaddr的节点,调用wake_futex来唤起等待进程释放自旋锁

Wake_futex将进程状态设置为TASK_RUNNING并将其添加到系统调度列表中。同时,该进程将从futex等待队列中删除。特定代码将不会被分析。有兴趣的人可以自己研究。

Java中的ReentrantLock,Object.wait和Thread.sleep等都是与futex的线程同步。了解futex的实现可以帮助您更好地理解和使用这些上层同步机制。此外,由于空间和能量有限,没有与流程调度相关的具体分析,但它并不妨碍对文章内容的理解。

——