分享

C 多线程的条件变量

 InfoRich 2023-01-17 发布于江苏

在C++的多线程中,使用互斥锁解决数据竞争访问问题,算是线程同步的加锁原语,用于排他性的访问共享数据。

我们在使用mutex时,一般都会期望加锁不要阻塞,总是能立刻拿到锁,然后尽快访问数据,用完之后尽快解锁,这样才能不影响并发性和性能。

如果需要等待某个条件的成立,我们就该使用条件变量(condition variable)了,那什么是条件变量呢?

条件变量是线程的另外一种有效同步机制。

这些同步对象为线程提供了交互的场所(一个线程给另外的一个或者多个线程发送消息),我们指定在条件变量这个地方发生,一个线程用于修改这个变量使其满足其它线程继续往下执行的条件,其它线程则等待接收条件已经发生改变的信号。

当条件变量同互斥锁一起使用时,条件变量允许线程以一种无竞争的方式等待任意条件的发生。

一、为何引入条件变量?

多线程并发访问共享数据时遇到的数据竞争时,我们可以通过互斥锁保护共享数据,保证多线程对共享数据的访问同步有序。

但如果一个线程需要等待一个互斥锁的释放,该线程通常需要轮询该互斥锁是否已被释放,我们也很难找到适当的轮训周期,如果轮询周期太短则太浪费CPU资源,如果轮询周期太长则可能互斥锁已被释放而该线程还在睡眠导致发生延误。

下面给出一个简单的程序示例:

一个线程往队列中放入数据,一个线程从队列中提取数据,取数据前需要判断一下队列中确实有数据,由于这个队列是线程间共享的,所以,需要使用互斥锁进行保护,一个线程在往队列添加数据的时候,另一个线程不能取,反之亦然。

程序实现代码如下:

//cond_var1.cpp用互斥锁实现一个生产者消费者模型
#include <iostream>#include <deque>#include <thread>#include <mutex>
std::deque<int> q; //双端队列标准容器全局变量std::mutex mu; //互斥锁全局变量//生产者,往队列放入数据void function_1() { int count = 10; while (count > 0) { std::unique_lock<std::mutex> locker(mu); q.push_front(count); //数据入队锁保护 locker.unlock(); std::this_thread::sleep_for(std::chrono::seconds(1)); //延时1秒 count--; }}//消费者,从队列提取数据void function_2() { int data = 0; while ( data != 1) { std::unique_lock<std::mutex> locker(mu); if (!q.empty()) { //判断队列是否为空 data = q.back(); q.pop_back(); //数据出队锁保护 locker.unlock(); std::cout << 't2 got a value from t1: ' << data << std::endl; } else { locker.unlock(); } }}
int main() { std::thread t1(function_1); std::thread t2(function_2); t1.join(); t2.join();
getchar(); return 0;}

程序执行结果如下:

Image

从代码中不难看出:

在生产过程中,因每放入一个数据有1秒延时,所以这个生产的过程是很慢的;

在消费过程中,存在着一个while循环,只有在接收到表示结束的数据的时候,才会停止,每次循环内部,都是先加锁,判断队列不空,然后就取出一个数,最后解锁。

所以说,在1s内,做了很多无用功!这样的话,CPU占用率会很高,可能达到100%(单核)。如下图示:

Image

既然是由于消费者在while循环内因等待数据做了过多的无用功导致CPU占有率过高,我们可以考虑在消费者发现队列为空时,让消费者小睡一会儿,即增加一个小延时(比如500ms),相当于增大了轮询间隔周期,应该能降低CPU的占用率

按该方案修改后的消费者代码如下:

//消费者,从队列提取数据void function_2() {    int data = 0;    while ( data != 1) {        std::unique_lock<std::mutex> locker(mu);        if (!q.empty()) {      //判断队列是否为空            data = q.back();            q.pop_back();      //数据出队锁保护            locker.unlock();            std::cout << 't2 got a value from t1: ' << data << std::endl;        } else {            locker.unlock();            std::this_thread::sleep_for(std::chrono::milliseconds(500));    //延时500毫秒        }    }}

增大轮询周期后,CPU占有率下降很明显:

Image

但前面也说了,困难之处在于如何确定这个延长时间(即轮询间隔周期),如果间隔太短会过多占用CPU资源,如果间隔太长会因无法及时响应造成延误。

这就引入了条件变量来解决该问题:

条件变量使用“通知—唤醒”模型,生产者生产出一个数据后通知消费者使用,消费者在未接到通知前处于休眠状态节约CPU资源;

当消费者收到通知后,赶紧从休眠状态被唤醒来处理数据,使用了事件驱动模型,在保证不误事儿的情况下尽可能减少无用功降低对资源的消耗。

二、如何使用条件变量?

C++标准库在< condition_variable >中提供了条件变量,借由它,一个线程可以唤醒一个或多个其他等待中的线程。原则上,条件变量的运作如下:

  • 你必须同时包含< mutex >和< condition_variable >,并声明一个mutex和一个condition_variable变量;

  • 那个通知“条件已满足”的线程(或多个线程之一)必须调用notify_one()或notify_all(),以便条件满足时唤醒处于等待中的一个条件变量;

  • 那个等待'条件被满足'的线程必须调用wait(),可以让线程在条件未被满足时陷入休眠状态,当接收到通知时被唤醒去处理相应的任务;

将上面的cond_var1.cpp程序使用条件变量解决轮询间隔难题的示例代码如下:

//cond_var2.cpp用条件变量解决轮询间隔难题
#include <iostream>#include <deque>#include <thread>#include <mutex>#include <condition_variable>
std::deque<int> q; //双端队列标准容器全局变量std::mutex mu; //互斥锁全局变量std::condition_variable cond; //全局条件变量//生产者,往队列放入数据void function_1() { int count = 10; while (count > 0) { std::unique_lock<std::mutex> locker(mu); q.push_front(count); //数据入队锁保护 locker.unlock(); cond.notify_one(); // 向一个等待线程发出“条件已满足”的通知 std::this_thread::sleep_for(std::chrono::seconds(1)); //延时1秒 count--; }}//消费者,从队列提取数据void function_2() { int data = 0; while ( data != 1) { std::unique_lock<std::mutex> locker(mu); while(q.empty()) //判断队列是否为空 cond.wait(locker); // 解锁互斥量并陷入休眠以等待通知被唤醒,被唤醒后加锁以保护共享数据
data = q.back(); q.pop_back(); //数据出队锁保护 locker.unlock(); std::cout << 't2 got a value from t1: ' << data << std::endl; }}
int main() { std::thread t1(function_1); std::thread t2(function_2); t1.join(); t2.join();
getchar(); return 0;}

使用条件变量对CPU的占用率也很低,而且免去了轮询间隔该设多长的难题:

Image

上面的代码有三个注意事项:

  1. 在function_2中,在判断队列是否为空的时候,使用的是while(q.empty()),而不是if(q.empty()),这是因为wait()从阻塞到返回,不一定就是由于notify_one()函数造成的,还有可能由于系统的不确定原因唤醒(可能和条件变量的实现机制有关),这个的时机和频率都是不确定的,被称作伪唤醒。如果在错误的时候被唤醒了,执行后面的语句就会错误,所以需要再次判断队列是否为空,如果还是为空,就继续wait()阻塞;

  2. 在管理互斥锁的时候,使用的是std::unique_lock而不是std::lock_guard,而且事实上也不能使用std::lock_guard。这需要先解释下wait()函数所做的事情,可以看到,在wait()函数之前,使用互斥锁保护了,如果wait的时候什么都没做,岂不是一直持有互斥锁?那生产者也会一直卡住,不能够将数据放入队列中了。所以,wait()函数会先调用互斥锁的unlock()函数,然后再将自己睡眠,在被唤醒后,又会继续持有锁,保护后面的队列操作。lock_guard没有lock和unlock接口,而unique_lock提供了,这就是必须使用unique_lock的原因;

  3. 使用细粒度锁,尽量减小锁的范围,在notify_one()的时候,不需要处于互斥锁的保护范围内,所以在唤醒条件变量之前可以将锁unlock()。

还可以将cond.wait(locker)换一种写法,wait()的第二个参数可以传入一个函数表示检查条件,这里使用lambda函数最为简单,如果这个函数返回的是true,wait()函数不会阻塞会直接返回,如果这个函数返回的是false,wait()函数就会阻塞着等待唤醒,如果被伪唤醒,会继续判断函数返回值。代码示例如下:

//消费者,从队列提取数据void function_2() {    int data = 0;    while ( data != 1) {        std::unique_lock<std::mutex> locker(mu);                cond.wait(locker, [](){ return !q.empty();});   //如果条件变量被唤醒,检查队列非空条件是否为真,为真则直接返回,为假则继续等待                data = q.back();        q.pop_back();      //数据出队锁保护        locker.unlock();        std::cout << 't2 got a value from t1: ' << data << std::endl;    }}

值得注意的是:

  1. 所有通知(notification)都会被自动同步化,所以并发调用notify_one()和notify_all()不会带来麻烦;

  2. 所有等待某个条件变量(condition variable)的线程都必须使用相同的mutex,当wait()家族的某个成员被调用时该mutex必须被unique_lock锁定,否则会发生不明确的行为;

  3. wait()函数会执行“解锁互斥量–>陷入休眠等待–>被通知唤醒–>再次锁定互斥量–>检查条件判断式是否为真”几个步骤,这意味着传给wait函数的判断式总是在锁定情况下被调用的,可以安全的处理受互斥量保护的对象;但在'解锁互斥量–>陷入休眠等待'过程之间产生的通知(notification)会被遗失。

线程同步保证了多个线程对共享数据的有序访问,目前我们了解到的多线程间传递数据主要是通过共享数据(全局变量)实现的,全局共享变量的使用容易增加不同任务或线程间的耦合度,也增加了引入bug的风险,所以全局共享变量应尽可能少用。很多时候我们只需要传递某个线程或任务的执行结果,以便参与后续的运算,但我们又不想阻塞等待该线程或任务执行完毕,而是继续执行暂时不需要该线程或任务执行结果参与的运算,当需要该线程执行结果时直接获得,才能更充分发挥多线程并发的效率优势。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多