线程源码分析之条件变量(基于linuxthreads2.0.1)
条件变量是线程间同步的一种机制,本文分析条件变量的实现和使用。我们先看一下条件变量的定义。
typedef
struct
{
int
c_spinlock
;
/* Spin lock to protect the queue. */
struct _pthread_queue c_waiting; /* Threads waiting on this condition. */
} pthread_cond_t;
我们看到条件变量的定义很简单,条件变量通常配合互斥变量一起使用,大致流程如下
加锁
if (条件不满足) {
阻塞在条件变量
}
操作加锁的资源
解锁
其实机制也很简单,条件变量就是在条件不满足的时候,把线程插入等待队列,等待条件满足的时候再唤醒队列里的线程。我们看一下具体实现。
// 阻塞等待条件。进入该函数前,已经获得了互斥锁mutex
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex){
volatile
pthread_t self
=
thread_self
(
)
;
// 加锁操作队列
acquire
(
&
cond
->
c_spinlock
)
;
// 插入条件的等待队列
enqueue
(
&
cond
->
c_waiting
,
self
)
;
// 操作完释放锁
release
(
&
cond
->
c_spinlock
)
;
// 释放互斥变量,否则别人无法操作资源,导致条件一直无法满足
pthread_mutex_unlock
(
mutex
)
;
// 挂起等待条件满足后被唤醒
suspend_with_cancellation
(
self
)
;
// 被唤醒后重新获取互斥锁
pthread_mutex_lock
(
mutex
)
;
/* This is a cancellation point */
// 取消点,等待期间被取消了
if
(
self
->
p_canceled
&&
self
->
p_cancelstate
==
PTHREAD_CANCEL_ENABLE
)
{
/* Remove ourselves from the waiting queue if we're still on it */
acquire
(
&
cond
->
c_spinlock
)
;
// 线程准备退出,从条件阻塞队列中移除
remove_from_queue
(
&
cond
->
c_waiting
,
self
)
;
release
(
&
cond
->
c_spinlock
)
;
pthread_exit
(
PTHREAD_CANCELED
)
;
}
return 0;
}
pthread_cond_wait函数是当条件不能满足时,线程调用的函数。调用完后线程会被挂起,等待被唤醒(如果不希望一直被阻塞可以调用pthread_cond_timedwait,pthread_cond_timedwait支持定时阻塞)。看一下挂起线程的逻辑。
static
inline
void
suspend_with_cancellation
(
pthread_t self
)
{
sigset_t mask
;
sigjmp_buf jmpbuf
;
// 获取当前的信号屏蔽码
sigprocmask
(
SIG_SETMASK
,
NULL
,
&
mask
)
;
/* Get current signal mask */
// 清除PTHREAD_SIG_RESTART的信号掩码,即允许处理该信号
sigdelset
(
&
mask
,
PTHREAD_SIG_RESTART
)
;
/* Unblock the restart signal */
/* No need to save the signal mask, we'll restore it ourselves */
/*
直接调用返回0,从siglongjump回来返回非0,这里支持线程挂起时,
收到restart信号被唤醒,或者在取消信号的处理函数中,通过siglongjmp返回这里
*/
if
(
sigsetjmp
(
jmpbuf
,
0
)
==
0
)
{
self
->
p_cancel_jmp
=
&
jmpbuf
;
// 已经被取消并且是可取消的则直接返回,否则挂起等待唤醒
if
(
!
(
self
->
p_canceled
&&
self
->
p_cancelstate
==
PTHREAD_CANCEL_ENABLE
)
)
{
do
{
// 挂起等待restart信号
sigsuspend
(
&
mask
)
;
/* Wait for a signal */
}
while
(
self
->
p_signal
!=
PTHREAD_SIG_RESTART
)
;
}
self
->
p_cancel_jmp
=
NULL
;
}
else
{
// 从cancel信号的处理函数中的siglongjmp返回,重新设置信号掩码,屏蔽restart信号
sigaddset
(
&
mask
,
PTHREAD_SIG_RESTART
)
;
/* Reblock the restart signal */
sigprocmask
(
SIG_SETMASK
,
&
mask
,
NULL
)
;
}
}
我们看到最终通过调用sigsuspend挂起线程。等待信号的唤醒,从while循环的条件我们可以看到,当收到PTHREAD_SIG_RESTART信号的时候线程才会真正被“唤醒”。接着我们看看当条件满足后,其他线程是如何唤醒被阻塞的线程的。
// 条件满足,唤醒线程
int pthread_cond_signal(pthread_cond_t *cond){
pthread_t th
;
acquire
(
&
cond
->
c_spinlock
)
;
// 取出一个被被阻塞的线程
th
=
dequeue
(
&
cond
->
c_waiting
)
;
release
(
&
cond
->
c_spinlock
)
;
// 发送信号唤醒他
if
(
th
!=
NULL
)
restart
(
th
)
;
return 0;
}
// 给pid进程发送唤醒信号static inline void restart(pthread_t th){
kill(th->p_pid, PTHREAD_SIG_RESTART);
}
我们看到pthread_cond_signal的函数非常简单,从阻塞队列中获取一个线程,然后给他发一个唤醒信号。另外线程库也支持唤醒所有线程。
// 条件满足,唤醒所有线程
int pthread_cond_broadcast(pthread_cond_t *cond){
pthread_queue tosignal
;
pthread_t th
;
acquire
(
&
cond
->
c_spinlock
)
;
/* Copy the current state of the waiting queue and empty it */
tosignal
=
cond
->
c_waiting
;
// 重置阻塞队列
queue_init
(
&
cond
->
c_waiting
)
;
release
(
&
cond
->
c_spinlock
)
;
/* Now signal each process in the queue */
// 发送信号唤醒所有线程
while
(
(
th
=
dequeue
(
&
tosignal
)
)
!=
NULL
)
restart
(
th
)
;
return 0;
}
pthread_cond_broadcast就是给每一个等待的线程发送唤醒信号。这就是线程条件变量的原理和实现。最后我们看一下使用例子。
struct
prodcons
{
int
buffer
[
BUFFER_SIZE
]
;
/* 环形数据缓冲区 */
pthread_mutex_t lock
;
/* 访问数据区的互斥锁 */
int
readpos
,
writepos
;
/* 读写指针 */
pthread_cond_t notempty
;
/* 消费者使用的条件变量,非空即有数据消费 */
pthread_cond_t notfull; /* 生产者使用的条件变量,非满即可以生产数据 */
};
struct prodcons buffer;
void init(struct prodcons * b){
pthread_mutex_init
(
&
b
->
lock
,
NULL
)
;
pthread_cond_init
(
&
b
->
notempty
,
NULL
)
;
pthread_cond_init
(
&
b
->
notfull
,
NULL
)
;
b
->
readpos
=
0
;
b->writepos = 0;
}
int main(){
pthread_t th_a
,
th_b
;
void
*
retval
;
// 初始化线程间共享的数据结构
init
(
&
buffer
)
;
// 创建两个线程
pthread_create
(
&
th_a
,
NULL
,
producer
,
0
)
;
pthread_create
(
&
th_b
,
NULL
,
consumer
,
0
)
;
pthread_join
(
th_a
,
&
retval
)
;
pthread_join
(
th_b
,
&
retval
)
;
return 0;
}
我们分别看看生产者和消费者的逻辑
void
*
producer
(
void
*
data
)
{
int
n
;
for
(
n
=
0
;
n
<
10000
;
n
++
)
{
printf
(
"%d --->\n"
,
n
)
;
put
(
&
buffer
,
n
)
;
}
put
(
&
buffer
,
OVER
)
;
return NULL;
}
void * consumer(void * data){
int
d
;
while
(
1
)
{
d
=
get
(
&
buffer
)
;
if
(
d
==
OVER
)
break
;
printf
(
"---> %d\n"
,
d
)
;
}
return NULL;}
我们看到生产者和消费者的逻辑很简单,就是一个往buffer里写数据,一个从buffer里读数据。问题在于如果没有空间可写或者没有数据可读怎么办?我们看get和put函数的逻辑。
void
put
(
struct
prodcons
*
b
,
int
data
)
{
// 操作共享数据需要加锁
pthread_mutex_lock
(
&
b
->
lock
)
;
/* 写指针+1等于读指针,说明没有空闲可写了,等待空闲空间 */
while
(
(
b
->
writepos
+
1
)
%
BUFFER_SIZE
==
b
->
readpos
)
{
pthread_cond_wait
(
&
b
->
notfull
,
&
b
->
lock
)
;
}
// pthread_cond_wait中被唤醒后会重新获得互斥锁,所以这里直接操作就行
b
->
buffer
[
b
->
writepos
]
=
data
;
b
->
writepos
++
;
// 到尾巴了,修正位置
if
(
b
->
writepos
>=
BUFFER_SIZE
)
b
->
writepos
=
0
;
/* 有数据可消费了,通知等待的消费者 */
pthread_cond_signal
(
&
b
->
notempty
)
;
pthread_mutex_unlock(&b->lock);
}
接着看看get的实现。
int
get
(
struct
prodcons
*
b
)
{
int
data
;
pthread_mutex_lock
(
&
b
->
lock
)
;
/* 读写指针相等说明没有数据读了,等待数据 */
while
(
b
->
writepos
==
b
->
readpos
)
{
pthread_cond_wait
(
&
b
->
notempty
,
&
b
->
lock
)
;
}
data
=
b
->
buffer
[
b
->
readpos
]
;
b
->
readpos
++
;
if
(
b
->
readpos
>=
BUFFER_SIZE
)
b
->
readpos
=
0
;
/* 消费了数据,说明有空闲空间了,唤醒生产者 */
pthread_cond_signal
(
&
b
->
notfull
)
;
pthread_mutex_unlock
(
&
b
->
lock
)
;
return data;
}
以上就是线程间同步机制:条件变量的实现和原理。
