vlambda博客
学习文章列表

C++多线程同步的几种方式

Overview

C++的多线程同步方式有这么几种:

  • mutex

    • lock_guard

    • unique_lock

  • condition_variable

  • future

    • promise

    • packaged_task

    • async

C++11并没有提供semaphore的API,信号量太容易出错了(too error prone),通过组合互斥锁(mutex)和条件变量(condition variable)可以达到相同的效果,且更加安全。

mutex

官网介绍:mutex

它包含以下三个部分:

  • Mutex type

  • Locks:lock_guard, unique_lock

  • Functions:try_locklock

example:

mutex的实现也很简单,在进入临界区之前调用该变量(mtx)的lock函数,出临界区之前调用该变量的(mtx)的unlock函数。所以程序会连续输出50个'*'或者连续输出50个'$',而不会'*''$'交替输出。

但是考虑这样一个问题,std::cout << '\n',这里发生异常会发生什么?抛出异常后,意味着mtx.unlock()不会被执行,即锁没有被释放,整个程序进入不了临界区,该程序往往会挂死。

// mutex example
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex

std::mutex mtx; // mutex for critical section

void print_block (int n, char c) {
// critical section (exclusive access to std::cout signaled by locking mtx):
mtx.lock();
for (int i=0; i<n; ++i) { std::cout << c; }
std::cout << '\n';
mtx.unlock();
}

int main ()
{
std::thread th1 (print_block,50,'*');
std::thread th2 (print_block,50,'$');

th1.join();
th2.join();

return 0;
}

Possible output (order of lines may vary, but characters are never mixed):

**************************************************
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$

lock_guard

在构造时,互斥对象被调用线程锁定,而在销毁时,互斥对象被解锁。它是最简单的锁,作为具有自动持续时间的对象特别有用,该持续时间一直持续到其上下文结束。这样,可以保证在抛出异常的情况下互斥对象已正确解锁。

example

// lock_guard example
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::lock_guard
#include <stdexcept> // std::logic_error

std::mutex mtx;

void print_even (int x) {
if (x%2==0) std::cout << x << " is even\n";
else throw (std::logic_error("not even"));
}

void print_thread_id (int id) {
try {
// using a local lock_guard to lock mtx guarantees unlocking on destruction / exception:
std::lock_guard<std::mutex> lck (mtx);
print_even(id);
}
catch (std::logic_error&) {
std::cout << "[exception caught]\n";
}
}

int main ()
{
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i)
threads[i] = std::thread(print_thread_id,i+1);

for (auto& th : threads) th.join();

return 0;
}

possible output

[exception caught]
2 is even
[exception caught]
4 is even
[exception caught]
6 is even
[exception caught]
8 is even
[exception caught]
10 is even

unique_lock

unique_lock基本用法和lock_guard一致,在构造函数和析构函数中进行锁操作,不同的地方在于它提供了非常多构造函数。

example

// unique_lock example
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock

std::mutex mtx; // mutex for critical section

void print_block (int n, char c) {
// critical section (exclusive access to std::cout signaled by lifetime of lck):
std::unique_lock<std::mutex> lck (mtx);
for (int i=0; i<n; ++i) { std::cout << c; }
std::cout << '\n';
}

int main ()
{
std::thread th1 (print_block,50,'*');
std::thread th2 (print_block,50,'$');

th1.join();
th2.join();

return 0;
}

condition_variable

条件变量是一个对象,可以阻塞线程,直到被通知恢复。当调用其等待功能之一时,它使用unique_lock(通过互斥锁)来锁定线程。该线程将保持阻塞状态,直到被另一个在同一个condition_variable对象上调用通知功能的线程唤醒为止。

Wait functions

  • wait

    • Wait until notified (public member function )

  • wait_for

    • Wait for timeout or until notified (public member function )

  • wait_until

    • Wait until notified or time point (public member function )

Notify functions

  • notify_one

    • Notify one (public member function )

  • notify_all

    • Notify all (public member function )

example

// condition_variable::notify_one
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable produce,consume;

int cargo = 0; // shared value by producers and consumers

void consumer () {
std::unique_lock<std::mutex> lck(mtx);
while (cargo==0) consume.wait(lck);
std::cout << cargo << '\n';
cargo=0;
produce.notify_one();
}

void producer (int id) {
std::unique_lock<std::mutex> lck(mtx);
while (cargo!=0) produce.wait(lck);
cargo = id;
consume.notify_one();
}

int main ()
{
std::thread consumers[10],producers[10];
// spawn 10 consumers and 10 producers:
for (int i=0; i<10; ++i) {
consumers[i] = std::thread(consumer);
producers[i] = std::thread(producer,i+1);
}

// join them back:
for (int i=0; i<10; ++i) {
producers[i].join();
consumers[i].join();
}

return 0;
}

Possible output (order of consumed cargoes may vary):

1
2
3
4
5
6
7
8
9
10

future

future的目标是充分利用CPU的并发性,它只能通过asyncpromisepackage_task三种方式构造。future只能移动,不可复制,需要复制时可以使用shared_future,但通常不建议使用。调用future的get()时可能会发生阻塞,直到返回值ready。future有三种姿势的等待:

  • wait():一直等待直到得到返回值

  • wait_for():设定一个超时时间;

  • wait_until():等待到某个时间点。

future有一特化版本future ,返回值为空,即不返回任何值,因此仅能用于线程间通知,但却是最常用的future。

promise

promise对象可以通过调用成员get_future将此共享状态与future对象关联。调用之后,两个对象共享相同的共享状态:

  • promise:promise对象是异步提供者,在共享状态的时候设置一个值

  • future负责:future对象是异步返回对象,可以检索共享状态的值,并在必要时等待其准备就绪

example

// promise example
#include <iostream> // std::cout
#include <functional> // std::ref
#include <thread> // std::thread
#include <future> // std::promise, std::future

void print_int (std::future<int>& fut) {
int x = fut.get();
std::cout << "value: " << x << '\n';
}

int main ()
{
std::promise<int> prom; // create promise

std::future<int> fut = prom.get_future(); // engagement with future

std::thread th1 (print_int, std::ref(fut)); // send future to new thread

prom.set_value (10); // fulfill promise
// (synchronizes with getting the future)
th1.join();
return 0;
}

Output:

value: 10

packaged_task

很多情况下并不希望另起一个线程,因为线程是非常重要的资源。因此希望可以合理的管理线程资源,这就需要使用线程池。如何将future与线程池同时使用呢?这就需要采用package_task。package_task本质是将一个函数包装成一个future。这个task类似于std::function,有输入输出,大家可以将其认为是一个异步函数,但该异步函数并不负责执行,而是将其结果预置于一个future变量中,然后交给一个线程来实际执行,此时主线程便可以得到其返回值。

example

// packaged_task example
#include <iostream> // std::cout
#include <future> // std::packaged_task, std::future
#include <chrono> // std::chrono::seconds
#include <thread> // std::thread, std::this_thread::sleep_for

// count down taking a second for each value:
int countdown (int from, int to) {
for (int i=from; i!=to; --i) {
std::cout << i << '\n';
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "Lift off!\n";
return from-to;
}

int main ()
{
std::packaged_task<int(int,int)> tsk (countdown); // set up packaged_task
std::future<int> ret = tsk.get_future(); // get future

std::thread th (std::move(tsk),10,0); // spawn thread to count down from 10 to 0

// ...

int value = ret.get(); // wait for the task to finish and get result

std::cout << "The countdown lasted for " << value << " seconds.\n";

th.join();

return 0;
}

Possible output:

10
9
8
7
6
5
4
3
2
1
Lift off!
The countdown lasted for 10 seconds.

async

有时某项工作很早就可以开始做(前置条件都已完备),而等待这件工作结果的任务在非常靠后的位置,这时候就需要async。换言之,如果可以尽早开始做一件事,就让其在后台运行即可,或快或慢都可以,只需在需要结果的时候运行完成就好。

example

// async example
#include <iostream> // std::cout
#include <future> // std::async, std::future

// a non-optimized way of checking for prime numbers:
bool is_prime (int x) {
std::cout << "Calculating. Please, wait...\n";
for (int i=2; i<x; ++i) if (x%i==0) return false;
return true;
}

int main ()
{
// call is_prime(313222313) asynchronously:
std::future<bool> fut = std::async (is_prime,313222313);

std::cout << "Checking whether 313222313 is prime.\n";
// ...

bool ret = fut.get(); // waits for is_prime to return

if (ret) std::cout << "It is prime!\n";
else std::cout << "It is not prime.\n";

return 0;
}

Possible output (the first two lines may be in a different order, or scrambled):

Checking whether 313222313 is prime.
Calculating. Please, wait...
It is prime!

Reference

  • 10分钟,带你掌握C++多线程同步!

  • Multi-threading