vlambda博客
学习文章列表

线程源码分析之条件变量(基于linuxthreads2.0.1)

条件变量是线程间同步的一种机制,本文分析条件变量的实现和使用。我们先看一下条件变量的定义。

  
    
    
  
typedef struct {
int c_spinlock ; /* Spin lock to protect the queue. */

struct _pthread_queue c_waiting; /* Threads waiting on this condition. */

} pthread_cond_t;

我们看到条件变量的定义很简单,条件变量通常配合互斥变量一起使用,大致流程如下

  
    
    
  
加锁
if (条件不满足) {
阻塞在条件变量
}
操作加锁的资源
解锁

其实机制也很简单,条件变量就是在条件不满足的时候,把线程插入等待队列,等待条件满足的时候再唤醒队列里的线程。我们看一下具体实现。

  
    
    
  

// 阻塞等待条件。进入该函数前,已经获得了互斥锁mutex

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex){

volatile pthread_t self = thread_self ( ) ;
// 加锁操作队列
acquire ( & cond -> c_spinlock ) ;
// 插入条件的等待队列
enqueue ( & cond -> c_waiting , self ) ;
// 操作完释放锁
release ( & cond -> c_spinlock ) ;
// 释放互斥变量,否则别人无法操作资源,导致条件一直无法满足
pthread_mutex_unlock ( mutex ) ;
// 挂起等待条件满足后被唤醒
suspend_with_cancellation ( self ) ;
// 被唤醒后重新获取互斥锁
pthread_mutex_lock ( mutex ) ;
/* This is a cancellation point */
// 取消点,等待期间被取消了
if ( self -> p_canceled && self -> p_cancelstate == PTHREAD_CANCEL_ENABLE ) {
/* Remove ourselves from the waiting queue if we're still on it */
acquire ( & cond -> c_spinlock ) ;
// 线程准备退出,从条件阻塞队列中移除
remove_from_queue ( & cond -> c_waiting , self ) ;
release ( & cond -> c_spinlock ) ;
pthread_exit ( PTHREAD_CANCELED ) ;
}

return 0;

}

pthread_cond_wait函数是当条件不能满足时,线程调用的函数。调用完后线程会被挂起,等待被唤醒(如果不希望一直被阻塞可以调用pthread_cond_timedwait,pthread_cond_timedwait支持定时阻塞)。看一下挂起线程的逻辑。

  
    
    
  
static inline void suspend_with_cancellation ( pthread_t self ) {
sigset_t mask
;
sigjmp_buf jmpbuf
;
// 获取当前的信号屏蔽码
sigprocmask ( SIG_SETMASK , NULL , & mask ) ; /* Get current signal mask */
// 清除PTHREAD_SIG_RESTART的信号掩码,即允许处理该信号
sigdelset ( & mask , PTHREAD_SIG_RESTART ) ; /* Unblock the restart signal */
/* No need to save the signal mask, we'll restore it ourselves */
/*
直接调用返回0,从siglongjump回来返回非0,这里支持线程挂起时,
收到restart信号被唤醒,或者在取消信号的处理函数中,通过siglongjmp返回这里
*/

if ( sigsetjmp ( jmpbuf , 0 ) == 0 ) {
self
-> p_cancel_jmp = & jmpbuf ;
// 已经被取消并且是可取消的则直接返回,否则挂起等待唤醒
if ( ! ( self -> p_canceled && self -> p_cancelstate == PTHREAD_CANCEL_ENABLE ) ) {
do {
// 挂起等待restart信号
sigsuspend ( & mask ) ; /* Wait for a signal */
} while ( self -> p_signal != PTHREAD_SIG_RESTART ) ;
}
self
-> p_cancel_jmp = NULL ;
} else {
// 从cancel信号的处理函数中的siglongjmp返回,重新设置信号掩码,屏蔽restart信号
sigaddset ( & mask , PTHREAD_SIG_RESTART ) ; /* Reblock the restart signal */
sigprocmask ( SIG_SETMASK , & mask , NULL ) ;

}

}

我们看到最终通过调用sigsuspend挂起线程。等待信号的唤醒,从while循环的条件我们可以看到,当收到PTHREAD_SIG_RESTART信号的时候线程才会真正被“唤醒”。接着我们看看当条件满足后,其他线程是如何唤醒被阻塞的线程的。

  
    
    
  

// 条件满足,唤醒线程

int pthread_cond_signal(pthread_cond_t *cond){

pthread_t th ;

acquire ( & cond -> c_spinlock ) ;
// 取出一个被被阻塞的线程
th
= dequeue ( & cond -> c_waiting ) ;
release ( & cond -> c_spinlock ) ;
// 发送信号唤醒他
if ( th != NULL ) restart ( th ) ;

return 0;

}


// 给pid进程发送唤醒信号static inline void restart(pthread_t th){

kill(th->p_pid, PTHREAD_SIG_RESTART);

}

我们看到pthread_cond_signal的函数非常简单,从阻塞队列中获取一个线程,然后给他发一个唤醒信号。另外线程库也支持唤醒所有线程。

  
    
    
  

// 条件满足,唤醒所有线程

int pthread_cond_broadcast(pthread_cond_t *cond){

pthread_queue tosignal ;
pthread_t th
;

acquire ( & cond -> c_spinlock ) ;
/* Copy the current state of the waiting queue and empty it */
tosignal
= cond -> c_waiting ;
// 重置阻塞队列
queue_init ( & cond -> c_waiting ) ;
release ( & cond -> c_spinlock ) ;
/* Now signal each process in the queue */
// 发送信号唤醒所有线程
while ( ( th = dequeue ( & tosignal ) ) != NULL ) restart ( th ) ;

return 0;

}

pthread_cond_broadcast就是给每一个等待的线程发送唤醒信号。这就是线程条件变量的原理和实现。最后我们看一下使用例子。

  
    
    
  
struct prodcons {
int buffer [ BUFFER_SIZE ] ; /* 环形数据缓冲区 */
pthread_mutex_t lock
; /* 访问数据区的互斥锁 */
int readpos , writepos ; /* 读写指针 */
pthread_cond_t notempty
; /* 消费者使用的条件变量,非空即有数据消费 */

pthread_cond_t notfull; /* 生产者使用的条件变量,非满即可以生产数据 */

};


struct prodcons buffer;


void init(struct prodcons * b){

pthread_mutex_init ( & b -> lock , NULL ) ;
pthread_cond_init ( & b -> notempty , NULL ) ;
pthread_cond_init ( & b -> notfull , NULL ) ;
b
-> readpos = 0 ;

b->writepos = 0;

}


int main(){

pthread_t th_a , th_b ;
void * retval ;
// 初始化线程间共享的数据结构
init ( & buffer ) ;
// 创建两个线程
pthread_create ( & th_a , NULL , producer , 0 ) ;
pthread_create ( & th_b , NULL , consumer , 0 ) ;
pthread_join ( th_a , & retval ) ;
pthread_join ( th_b , & retval ) ;

return 0;

}

我们分别看看生产者和消费者的逻辑

  
    
    
  
void * producer ( void * data ) {
int n ;
for ( n = 0 ; n < 10000 ; n ++ ) {
printf ( "%d --->\n" , n ) ;
put ( & buffer , n ) ;
}
put ( & buffer , OVER ) ;

return NULL;

}


void * consumer(void * data){

int d ;
while ( 1 ) {
d
= get ( & buffer ) ;
if ( d == OVER ) break ;
printf ( "---> %d\n" , d ) ;
}

return NULL;}


我们看到生产者和消费者的逻辑很简单,就是一个往buffer里写数据,一个从buffer里读数据。问题在于如果没有空间可写或者没有数据可读怎么办?我们看get和put函数的逻辑。

  
    
    
  
void put ( struct prodcons * b , int data ) {
// 操作共享数据需要加锁
pthread_mutex_lock ( & b -> lock ) ;
/* 写指针+1等于读指针,说明没有空闲可写了,等待空闲空间 */
while ( ( b -> writepos + 1 ) % BUFFER_SIZE == b -> readpos ) {
pthread_cond_wait ( & b -> notfull , & b -> lock ) ;
}
// pthread_cond_wait中被唤醒后会重新获得互斥锁,所以这里直接操作就行
b
-> buffer [ b -> writepos ] = data ;
b
-> writepos ++ ;
// 到尾巴了,修正位置
if ( b -> writepos >= BUFFER_SIZE ) b -> writepos = 0 ;
/* 有数据可消费了,通知等待的消费者 */
pthread_cond_signal ( & b -> notempty ) ;

pthread_mutex_unlock(&b->lock);

}

接着看看get的实现。

  
    
    
  
int get ( struct prodcons * b ) {
int data ;
pthread_mutex_lock ( & b -> lock ) ;
/* 读写指针相等说明没有数据读了,等待数据 */
while ( b -> writepos == b -> readpos ) {
pthread_cond_wait ( & b -> notempty , & b -> lock ) ;
}
data
= b -> buffer [ b -> readpos ] ;
b
-> readpos ++ ;
if ( b -> readpos >= BUFFER_SIZE ) b -> readpos = 0 ;
/* 消费了数据,说明有空闲空间了,唤醒生产者 */
pthread_cond_signal ( & b -> notfull ) ;
pthread_mutex_unlock ( & b -> lock ) ;

return data;

}

以上就是线程间同步机制:条件变量的实现和原理。