C++多线程同步的几种方式
Overview
C++的多线程同步方式有这么几种:
mutex
lock_guard
unique_lock
condition_variable
future
promise
packaged_task
async
C++11并没有提供semaphore
的API,信号量太容易出错了(too error prone),通过组合互斥锁(mutex)和条件变量(condition variable)可以达到相同的效果,且更加安全。
mutex
官网介绍:mutex
它包含以下三个部分:
Mutex type
Locks:
lock_guard
,unique_lock
Functions:
try_lock
,lock
example:
mutex的实现也很简单,在进入临界区之前调用该变量(mtx
)的lock
函数,出临界区之前调用该变量的(mtx
)的unlock
函数。所以程序会连续输出50个'*'
或者连续输出50个'$'
,而不会'*'
'$'
交替输出。
但是考虑这样一个问题,std::cout << '\n'
,这里发生异常会发生什么?抛出异常后,意味着mtx.unlock()
不会被执行,即锁没有被释放,整个程序进入不了临界区,该程序往往会挂死。
// mutex example
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex
std::mutex mtx; // mutex for critical section
void print_block (int n, char c) {
// critical section (exclusive access to std::cout signaled by locking mtx):
mtx.lock();
for (int i=0; i<n; ++i) { std::cout << c; }
std::cout << '\n';
mtx.unlock();
}
int main ()
{
std::thread th1 (print_block,50,'*');
std::thread th2 (print_block,50,'$');
th1.join();
th2.join();
return 0;
}
Possible output (order of lines may vary, but characters are never mixed):
**************************************************
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
lock_guard
在构造时,互斥对象被调用线程锁定,而在销毁时,互斥对象被解锁。它是最简单的锁,作为具有自动持续时间的对象特别有用,该持续时间一直持续到其上下文结束。这样,可以保证在抛出异常的情况下互斥对象已正确解锁。
example
// lock_guard example
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::lock_guard
#include <stdexcept> // std::logic_error
std::mutex mtx;
void print_even (int x) {
if (x%2==0) std::cout << x << " is even\n";
else throw (std::logic_error("not even"));
}
void print_thread_id (int id) {
try {
// using a local lock_guard to lock mtx guarantees unlocking on destruction / exception:
std::lock_guard<std::mutex> lck (mtx);
print_even(id);
}
catch (std::logic_error&) {
std::cout << "[exception caught]\n";
}
}
int main ()
{
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i)
threads[i] = std::thread(print_thread_id,i+1);
for (auto& th : threads) th.join();
return 0;
}
possible output
[exception caught]
2 is even
[exception caught]
4 is even
[exception caught]
6 is even
[exception caught]
8 is even
[exception caught]
10 is even
unique_lock
unique_lock基本用法和lock_guard一致,在构造函数和析构函数中进行锁操作,不同的地方在于它提供了非常多构造函数。
example
// unique_lock example
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
std::mutex mtx; // mutex for critical section
void print_block (int n, char c) {
// critical section (exclusive access to std::cout signaled by lifetime of lck):
std::unique_lock<std::mutex> lck (mtx);
for (int i=0; i<n; ++i) { std::cout << c; }
std::cout << '\n';
}
int main ()
{
std::thread th1 (print_block,50,'*');
std::thread th2 (print_block,50,'$');
th1.join();
th2.join();
return 0;
}
condition_variable
条件变量是一个对象,可以阻塞线程,直到被通知恢复。当调用其等待功能之一时,它使用unique_lock
(通过互斥锁)来锁定线程。该线程将保持阻塞状态,直到被另一个在同一个condition_variable
对象上调用通知功能的线程唤醒为止。
Wait functions
wait
Wait until notified (public member function )
wait_for
Wait for timeout or until notified (public member function )
wait_until
Wait until notified or time point (public member function )
Notify functions
notify_one
Notify one (public member function )
notify_all
Notify all (public member function )
example
// condition_variable::notify_one
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx;
std::condition_variable produce,consume;
int cargo = 0; // shared value by producers and consumers
void consumer () {
std::unique_lock<std::mutex> lck(mtx);
while (cargo==0) consume.wait(lck);
std::cout << cargo << '\n';
cargo=0;
produce.notify_one();
}
void producer (int id) {
std::unique_lock<std::mutex> lck(mtx);
while (cargo!=0) produce.wait(lck);
cargo = id;
consume.notify_one();
}
int main ()
{
std::thread consumers[10],producers[10];
// spawn 10 consumers and 10 producers:
for (int i=0; i<10; ++i) {
consumers[i] = std::thread(consumer);
producers[i] = std::thread(producer,i+1);
}
// join them back:
for (int i=0; i<10; ++i) {
producers[i].join();
consumers[i].join();
}
return 0;
}
Possible output (order of consumed cargoes may vary):
1
2
3
4
5
6
7
8
9
10
future
future
的目标是充分利用CPU的并发性,它只能通过async
,promise
和package_task
三种方式构造。future
只能移动,不可复制,需要复制时可以使用shared_future
,但通常不建议使用。调用future
的get()时可能会发生阻塞,直到返回值ready。future
有三种姿势的等待:
wait()
:一直等待直到得到返回值wait_for()
:设定一个超时时间;wait_until()
:等待到某个时间点。
future有一特化版本future
promise
promise对象可以通过调用成员get_future将此共享状态与future对象关联。调用之后,两个对象共享相同的共享状态:
promise
:promise对象是异步提供者,在共享状态的时候设置一个值future
负责:future对象是异步返回对象,可以检索共享状态的值,并在必要时等待其准备就绪
example
// promise example
#include <iostream> // std::cout
#include <functional> // std::ref
#include <thread> // std::thread
#include <future> // std::promise, std::future
void print_int (std::future<int>& fut) {
int x = fut.get();
std::cout << "value: " << x << '\n';
}
int main ()
{
std::promise<int> prom; // create promise
std::future<int> fut = prom.get_future(); // engagement with future
std::thread th1 (print_int, std::ref(fut)); // send future to new thread
prom.set_value (10); // fulfill promise
// (synchronizes with getting the future)
th1.join();
return 0;
}
Output:
value: 10
packaged_task
很多情况下并不希望另起一个线程,因为线程是非常重要的资源。因此希望可以合理的管理线程资源,这就需要使用线程池。如何将future与线程池同时使用呢?这就需要采用package_task。package_task本质是将一个函数包装成一个future。这个task类似于std::function,有输入输出,大家可以将其认为是一个异步函数,但该异步函数并不负责执行,而是将其结果预置于一个future变量中,然后交给一个线程来实际执行,此时主线程便可以得到其返回值。
example
// packaged_task example
#include <iostream> // std::cout
#include <future> // std::packaged_task, std::future
#include <chrono> // std::chrono::seconds
#include <thread> // std::thread, std::this_thread::sleep_for
// count down taking a second for each value:
int countdown (int from, int to) {
for (int i=from; i!=to; --i) {
std::cout << i << '\n';
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "Lift off!\n";
return from-to;
}
int main ()
{
std::packaged_task<int(int,int)> tsk (countdown); // set up packaged_task
std::future<int> ret = tsk.get_future(); // get future
std::thread th (std::move(tsk),10,0); // spawn thread to count down from 10 to 0
// ...
int value = ret.get(); // wait for the task to finish and get result
std::cout << "The countdown lasted for " << value << " seconds.\n";
th.join();
return 0;
}
Possible output:
10
9
8
7
6
5
4
3
2
1
Lift off!
The countdown lasted for 10 seconds.
async
有时某项工作很早就可以开始做(前置条件都已完备),而等待这件工作结果的任务在非常靠后的位置,这时候就需要async。换言之,如果可以尽早开始做一件事,就让其在后台运行即可,或快或慢都可以,只需在需要结果的时候运行完成就好。
example
// async example
#include <iostream> // std::cout
#include <future> // std::async, std::future
// a non-optimized way of checking for prime numbers:
bool is_prime (int x) {
std::cout << "Calculating. Please, wait...\n";
for (int i=2; i<x; ++i) if (x%i==0) return false;
return true;
}
int main ()
{
// call is_prime(313222313) asynchronously:
std::future<bool> fut = std::async (is_prime,313222313);
std::cout << "Checking whether 313222313 is prime.\n";
// ...
bool ret = fut.get(); // waits for is_prime to return
if (ret) std::cout << "It is prime!\n";
else std::cout << "It is not prime.\n";
return 0;
}
Possible output (the first two lines may be in a different order, or scrambled):
Checking whether 313222313 is prime.
Calculating. Please, wait...
It is prime!
Reference
10分钟,带你掌握C++多线程同步!
Multi-threading