vlambda博客
学习文章列表

聊聊C++异步编程-2

上篇 谈到了在C++11中对线程的底层操作方法,但在实际项目中,使用这样的方式开发效率低而且容易犯错。那有没有更方便,简洁,安全的方式呢,其实是有的,我把它称为High-Level操作(虽然这个名称分类未必准确)。

High-Level 操作

future & promise

c++11起提供了future,promise来满足跨线程取值的需求。想象一下,在此之前是如何操作的。能用的手段只有通过共享变量,通过condition_variable来提醒其他线程取值,更早之前,只能通过mutex和多个共享变量,组合传递出“变量的更改状态”、“变量值”等信息。

具体future和promise是如何工作的?网上有个经典的示意图(下图)。有线程1和2,线程1希望从线程2中获取特定值,其步骤如下:

  • 线程1:创建promise对象,并从该promise对象中获得对应的future对象。线程1将promise对象传递给线程2,完成其他工作后,通过future::get()方法等待从线程2中取值。此时线程1被阻塞。完成取值后继续剩下的工作。

  • 线程2:接受传入的promise对象,通过promise的set_XXX等方法设置特定值。然后继续线程2自身的工作。

从上述流程可以看出,promise与future是成对出现的。生产者通过promise赋值,消费者通过future取值。示意代码如下,比较简单明了。

void accumulate(std::vector<int>::iterator first,
std::vector<int>::iterator last,
std::promise<int> &accumulate_promise)
{
int sum = std::accumulate(first, last, 0);
accumulate_promise.set_value(sum); // Notify future
}

int test_future_promise(int argc, char *argv[])
{
try {
std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
std::promise<int> accumulate_promise;
std::future<int> accumulate_future = accumulate_promise.get_future();
std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
std::ref(accumulate_promise));

//accumulate_future.wait(); // wait for result

std::cout << "result=" << accumulate_future.get() << '\n';

work_thread.join(); // wait for thread completion
}
catch (std::exception &e)
{
std::cerr << e.what();
}
return 0;
}

生产者通过promise的set_value方法赋值。在set_value方法内获取promise内部存储的mutex,并更新对象。注意: set_value不能设置多次,否则会丢出异常。

消费者通过future的get来阻塞线程获取对应的值。此处要注意future和promise都是不可拷贝,但可以转移,因为其内部存储了mutex。注意:get只能被调用一次,如果希望多次调用需要使用shared_future。为什么会这样?我们看下两个future的get实现:

//future::get
_Ty get() { // block until ready then return the stored result or
// throw the stored exception
future _Local{_STD move(*this)};
return _STD move(_Local._Get_value());
}

// shared_future:get
const _Ty& get() const { // block until ready then return the stored result or
// throw the stored exception
return this->_Get_value();
}

看出区别了吧,future中的get是的返回值是move出来的,当第二次调用get就无法获得有效值了。而shared_future中的返回const 引用,可以多次get。除此以外,future不支持拷贝构造,而shared_future支持。

packaged_task & async

packaged_task 就像是一个捆绑了std::function的std::future,感觉略有点鸡肋。我们通过下面的代码,来看看如何使用它。packaged_task中可以传入函数对象,函数指针,lambda函数等。注意,这些函数并不是在package_task构造时被执行,需要手动invoke,才能执行。可以在当前线程执行,也可以被move到其他线程。

packaged_task提供get_future能从中取得future,借此可以获取绑定函数的返回值。

void task_thread()
{
std::packaged_task<int(int,int)> task([](int a, int b) {
return std::pow(a, b);
});
std::future<int> result = task.get_future();

std::thread task_td(std::move(task), 2, 10);
task_td.join();

std::cout << "task_thread:\t" << result.get() << '\n';
}

async 是更上层的封装,使用起来也更简洁。我们先看下async的函数形式,这是c++17以后,c++20以前的形式。重载版本有两个,区别在于第一个参数是否是std::lanuch策略

template< class Function, class... Args >
std::future<std::invoke_result_t<std::decay_t<Function>,
std::decay_t<Args>...>>
async( std::launch policy, Function&& f, Args&&... args );

template< class Function, class... Args>
std::future<std::invoke_result_t<std::decay_t<Function>,
std::decay_t<Args>...>>
async( Function&& f, Args&&... args );

std::launch policy 有两种:async 或 deferred。两者的区别在于,async是在新线程中异步执行,通过返回的future的get函数来取值。而deferred 是在当前线程中同步执行,但请注意,并不是立即执行,而是延后到get函数被调用的时候才执行,这种取值的行为又被称为lazy evaluation。

没有std::launch的重载版本默认是在新线程中执行。

#include <iostream>#include <vector>#include <algorithm>#include <numeric>#include <future>#include <string>#include <mutex> 
std::mutex m;
struct X {
void foo(int i, const std::string& str) {
std::lock_guard<std::mutex> lk(m);
std::cout << str << ' ' << i << '\n';
}
void bar(const std::string& str) {
std::lock_guard<std::mutex> lk(m);
std::cout << str << '\n';
}
int operator()(int i) {
std::lock_guard<std::mutex> lk(m);
std::cout << i << '\n';
return i + 10;
}
};

template <typename RandomIt>
int parallel_sum(RandomIt beg, RandomIt end)
{
auto len = end - beg;
if (len < 1000)
return std::accumulate(beg, end, 0);

RandomIt mid = beg + len/2;
auto handle = std::async(std::launch::async,
parallel_sum<RandomIt>, mid, end);
int sum = parallel_sum(beg, mid);
return sum + handle.get();
}

int main()
{
std::vector<int> v(10000, 1);
std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';

X x;
// Calls (&x)->foo(42, "Hello") with default policy:
// may print "Hello 42" concurrently or defer execution
auto a1 = std::async(&X::foo, &x, 42, "Hello");
// Calls x.bar("world!") with deferred policy
// prints "world!" when a2.get() or a2.wait() is called
auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");
// Calls X()(43); with async policy
// prints "43" concurrently
auto a3 = std::async(std::launch::async, X(), 43);
a2.wait(); // prints "world!"
std::cout << a3.get() << '\n'; // prints "53"
}

到此介绍完了c++11中thread库提供的所有异步操作。

参考

  • thispointer.com/c11-mul

  • cnblogs.com/wangshaowei

  • stackoverflow.com/quest

  • open-std.org/jtc1/sc22/