vlambda博客
学习文章列表

聊聊C++异步编程-1

背景

在产品端开发软件多了,免不了遇到多线程处理的情况,这也符合多核、异构的现代化硬件发展的需求。多线程处理中常见的有两种应用情形:

  • 并行化算法处理。利用OpenMP/TBB等CPU并行库,或者CUDA/OpenCL等GPU并行库,将问题拆分成多个相互独立的子问题,通过并行库提供的原语来进行组织,调度多个子线程并行处理,再利用库提供的规约等操作,收集处理各个线程反馈的结果,形成最终的结果。在这类应用中,线程之间的数据交换是低频行为,尤其在算法设计阶段会刻意的避免线程之间的信息传递已达到高效处理的效果。本文不讨论此部分内容。

  • 并行化业务逻辑处理。业务逻辑处理过程中经常需要频繁的发送、等待、接收其他业务线程的数据,信息交换是常见且高频的行为。如何高效地开发异步程序,避免出错是本文要讨论的内容。

多种异步编程方式

异步编程,我们期望达到的目标是:

  • 将不同任务(Task)分配到不同线程执行(并支持设置线程的优先级);

  • 以手动/自动方式,解决不同Task之间资源依赖关系;

按照个人浅见,把C++端的多种异步编程行为做了简单总结,按照操作对象从低到高的层级分成两类,本文先介绍低级操作。

Low-Level 操作

Low-Level操作指的是直接操作线程,由开发人员手动控制线程的创建与销毁,控制线程之间资源的竞争分配。

线程创建

从C++11开始,标准库中引入了std::thread,用户通过它可以方便地把函数移动到另一个线程中执行。

int test_thread(int argc, char *argv[])
{
std::mutex g_display_mutex;
auto fun = [](std::string_view a, std::mutex & lock) {
std::unique_lock<std::mutex> lock_guard(lock);
std::cout << "thread id = " << std::this_thread::get_id() << "\t " << a << "\n";
};
std::thread t(fun,"hello", std::ref(g_display_mutex));
fun("world", g_display_mutex);
t.join();
return 0;
}

稍微介绍一下,定义一个lambda函数fun,输出线程id和一个传入的字符串。分别在新起的线程和当前线程中调用它。使用std::mutex的原因是,两个线程都需要通过全局对象std::cout来输出到屏幕,如果没有mutex,会打乱id和字符串的输出顺序。

std::thread有几个比较有意思的方法,如yield、sleep_for、sleep_until用于细致控制指定thread的行为,但也要注意它们在跨平台下行为可能的 不一致性。

  • yield: 退出线程当前占用的CPU时间片,重新接受操作系统的调度。注意:操作系统会唤起等待队列中(相同优先级的)的其他线程执行,如果没有在等待时间片的其他线程,当前线程会继续被执行。

  • sleep_for: 阻塞当前线程,睡眠指定的时长。注意:这个函数是平台相关的。一般来说线程被阻塞的时长都会  指定的时长。这个函数的行为和yield有些像,在实际应用中,也经常会看到用如下代码来让出线程占用的CPU时间片:

std::this_thread::sleep_for(1ms); // sleep_for(0ms)

但是请注意,如果你只是想让出当前线程占用的时间片,请使用yield,除非你真的希望sleep指定时间。

  • sleep_until: 与sleep_for相似,区别在于sleep_for是睡眠规定的时长(duration),而sleep_until是睡眠到给定的时间点。

互斥锁

在线程之间访问共享数据需要通过互斥锁来同步,保证同一时刻只有一个线程可以访问(或者只有一个线程进行写操作)。C++11起,标准库提供std::mutex以满足开发者对互斥锁的需求,相关的变体还有许多,如recursive_mutex,timed_mutex,shared_mutex等等。注意,mutex不可复制,不可移动(move)。

  • std::recursive_mutex 是递归锁,和mutex的区别是,递归锁内部有计数器,允许同一个线程多次加解锁,只有当加解锁的次数相当,才会真正释放共享资源的占有权。注意:请勿滥用recursive_mutex。虽然看起来方便,但是会掩盖代码中的真正问题,让调试变得异常困难。如非必须,勿用。

  • std::timed_mutex 是带有时间控制的mutex,新增了try_lock_for和try_lock_until两个方法。和mutex的try_lock相比,增加了超时判断。除此以外没有特别需要注意的。

  • std::shared_mutex 是读写锁,提供两种访问权限的控制:共享性(shared)和排他性(exclusive)。通过lock/try_lock获取排他性访问权限,通过lock_shared/try_lock_shared获取共享性访问权限。这样的设置对于区分不同线程的读写操作特别有用。shared_mutex是c++17中引入的,使用时需要注意编译器版本。

手动使用lock/unlock来加减锁不是个好方法,因为人都会犯错,容易漏写unlock,尤其在复杂的逻辑判断中。因此标准库提供了一套RAII的机制。如lock_guard, scoped_lock,unique_lock,shared_lock等。lock_guard 和scoped_lock, 两者作用相似,都是mutex的wrapper,两者不同点是,scoped_lock支持同时lock多个mutex,而lock_guard不支持。另外xxx_lock支持手动调用lock和unlock方法,而lock_guard不支持, 听起来似乎lock _guard的行为更符合RAII的定义。实际上,scoped_lock是c++17标准中引入的,而lock_guard更像是即将被deprecated的方法。xxx_lock具有lock和unlock方法,可以在其生命周期结束前调用unlock。

unique_lock在构造时,还可以指定不同locking策略:

std::mutex g;
std::unique_lock<std::mutex> lg(g, std::defer_lock()); // std::adopt_lock, std::try_to_lock

常见的策略包括3种:

  • defer_lock: 不立即取得mutex的拥有权。这种策略一般出现在需要同时lock多个mutex的时候,可以通过std::lock同时lock多个unique_lock.

  • try_to_lock_t: 尝试获取mutex的拥有权。

  • adopt_lock_t:假设线程已经在其他处获取了mutex的拥有权。

在开发过程中,有时候需要保证某个函数只被调用一次,这个需求在线程安全的单例类编写中时常会出现。传统使用mutex的写法如下,通过两次检查instance_来确保资源只申请一次。这类方法被称为 Double-Checked Locking Pattern。事实上DCLP的方法也不能解决多线程环境的共享资源保护问题(具体原因参考 Scott的C++ and the Perils of Double-Checked Locking ,中文资料参考 blog.csdn.net/tantexian),其主要原因是 instance_.reset(new Singleton) 并非原子操作,编译器会将其转换成三条语句来实现,这就导致了DCLP方法可能的失败。

class Singleton {
public:
static Singleton& GetInstance() {
if (!instance_) {
std::lock_guard<std::mutex> lock(mutex_);
if (!instance_) {
instance_.reset(new Singleton);
}
}
return *instance_;
}

~Singleton() = default;

private:
Singleton() = default;

Singleton(const Singleton&) = delete;
Singleton& operator=(const Singleton&) = delete;

private:
static std::unique_ptr<Singleton> instance_;
static std::mutex mutex_;
};

针对这个问题,c++11 引入了 std::call_once,通过传入的std::once_flag来确保函数在多线程环境下只被执行一次,完美地解决上述问题。由此可见,语言层面的解决方案才足够简洁啊。

class Singleton {
public:
static Singleton& GetInstance() {
static std::once_flag s_flag;
std::call_once(s_flag, [&]() {
instance_.reset(new Singleton);
});

return *instance_;
}

~Singleton() = default;

private:
Singleton() = default;

Singleton(const Singleton&) = delete;
Singleton& operator=(const Singleton&) = delete;

private:
static std::unique_ptr<Singleton> instance_;
};

条件变量

线程之间除了会竞争共享资源以外,还有另一种比较常见的操作,那就是:线程同步。线程同步是指,线程之间如何按照约定的先后次序执行。c++11中引入了条件变量condition_variable 来实现这个需求。

condition_variable是一个同步原语,它可以同时阻塞一个或多个线程,指导其他线程更改了共享变量,并通知了当前条件变量。注意其中的“共享变量”,也就是条件变量中“条件”二字所指,当共享变量状态发生改变,意味着“条件”发生变化。

condition_variable需要捆绑mutex来使用。线程使用condition_variable一般有两种方式:

  • 通知其他线程:需要执行三个步骤

    • 获取std::mutex,一般通过std::lock_guard 或 std::unique_lock

    • 修改共享变量

    • 执行条件变量的notify_one或者notify_all方法 (此时可以lock可以释放)

  • 等候其他线程的通知:

    • 获取std::mutex,只能通过std::unique_lock,(注意必须是同一个mutex,因为需要保护共享变量)

    • 执行wait函数

    • 当条件变量被通知时,当前线程会被唤起,自动获取mutex。注意一旦发生虚假唤醒,线程将继续wait。

看下官方的示例代码。

#include <iostream>#include <string>#include <thread>#include <mutex>#include <condition_variable> 
std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;

void worker_thread()
{
// Wait until main() sends data
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return ready;});

// after the wait, we own the lock.
std::cout << "Worker thread is processing data\n";
data += " after processing";

// Send data back to main()
processed = true;
std::cout << "Worker thread signals data processing completed\n";

// Manual unlocking is done before notifying, to avoid waking up
// the waiting thread only to block again (see notify_one for details)
lk.unlock();
cv.notify_one();
}

int main()
{
std::thread worker(worker_thread);

data = "Example data";
// send data to the worker thread
{
std::lock_guard<std::mutex> lk(m);
ready = true;
std::cout << "main() signals data ready for processing\n";
}
cv.notify_one();

// wait for the worker
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return processed;});
}
std::cout << "Back in main(), data = " << data << '\n';

worker.join();
}

简单解释一下。main主线程中首先启动worker子线程。通过mutex保护共享变量ready。主线程修改ready后,通过cv.notify_one发出通知,然后通过cv.wait等待processed变量被置true。我们注意看wait的写法。wait有两个重载函数,第一个只接受unique_lock,第二个还接受Predicate。

void wait( std::unique_lock<std::mutex>& lock );
template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate pred );

如果你以为Predicate版本的wait函数等价于如下的if判断,那你就错了。为了处理小概率的虚假唤醒,该版本的wait其实等价于while的实现版本:

if(!pred())
wait(lock);

while(!pred())
wait(lock);

看完了主线程,让我们再看worker_thread的实现。与主线程相比,不同点在于将lock_guard换成了unique_lock,因此在notify_once之前需要手动unlock。为什么要这么做?按照官方解释:

The notifying thread does not need to hold the lock on the same mutex as the one held by the waiting thread(s); in fact doing so is a pessimization, since the notified thread would immediately block again, waiting for the notifying thread to release the lock. However, some implementations (in particular many implementations of pthreads) recognize this situation and avoid this "hurry up and wait" scenario by transferring the waiting thread from the condition variable's queue directly to the queue of the mutex within the notify call, without waking it up.

如果notify的时候还不释放mutex,会导致被通知的线程马上被再次block。

以上就是关于low-level操作的内容。

参考

  • jakascorner.com/blog/20

  • cnblogs.com/zhanghu5203

  • stackoverflow.com/quest

  • en.cppreference.com/w/c

  • C++11多线程-异步运行(3)之最终篇(future+async)

  • C++11多线程-异步运行(1)之std::promise

  • C++11多线程-异步运行(2)之std::packaged_task

  • blog.csdn.net/u01172600

  • aristeia.com/Papers/DDJ

  • 虚假唤醒(spurious wakeup)