在C++的多线程中,使用互斥锁解决数据竞争访问问题,算是线程同步的加锁原语,用于排他性的访问共享数据。 我们在使用mutex时,一般都会期望加锁不要阻塞,总是能立刻拿到锁,然后尽快访问数据,用完之后尽快解锁,这样才能不影响并发性和性能。 如果需要等待某个条件的成立,我们就该使用条件变量(condition variable)了,那什么是条件变量呢? 条件变量是线程的另外一种有效同步机制。 这些同步对象为线程提供了交互的场所(一个线程给另外的一个或者多个线程发送消息),我们指定在条件变量这个地方发生,一个线程用于修改这个变量使其满足其它线程继续往下执行的条件,其它线程则等待接收条件已经发生改变的信号。 当条件变量同互斥锁一起使用时,条件变量允许线程以一种无竞争的方式等待任意条件的发生。 一、为何引入条件变量? 多线程并发访问共享数据时遇到的数据竞争时,我们可以通过互斥锁保护共享数据,保证多线程对共享数据的访问同步有序。 但如果一个线程需要等待一个互斥锁的释放,该线程通常需要轮询该互斥锁是否已被释放,我们也很难找到适当的轮训周期,如果轮询周期太短则太浪费CPU资源,如果轮询周期太长则可能互斥锁已被释放而该线程还在睡眠导致发生延误。 下面给出一个简单的程序示例: 一个线程往队列中放入数据,一个线程从队列中提取数据,取数据前需要判断一下队列中确实有数据,由于这个队列是线程间共享的,所以,需要使用互斥锁进行保护,一个线程在往队列添加数据的时候,另一个线程不能取,反之亦然。 程序实现代码如下: //cond_var1.cpp用互斥锁实现一个生产者消费者模型
#include <iostream> #include <deque> #include <thread> #include <mutex>
std::deque<int> q; //双端队列标准容器全局变量 std::mutex mu; //互斥锁全局变量 //生产者,往队列放入数据 void function_1() { int count = 10; while (count > 0) { std::unique_lock<std::mutex> locker(mu); q.push_front(count); //数据入队锁保护 locker.unlock(); std::this_thread::sleep_for(std::chrono::seconds(1)); //延时1秒 count--; } } //消费者,从队列提取数据 void function_2() { int data = 0; while ( data != 1) { std::unique_lock<std::mutex> locker(mu); if (!q.empty()) { //判断队列是否为空 data = q.back(); q.pop_back(); //数据出队锁保护 locker.unlock(); std::cout << 't2 got a value from t1: ' << data << std::endl; } else { locker.unlock(); } } }
int main() { std::thread t1(function_1); std::thread t2(function_2); t1.join(); t2.join();
getchar(); return 0; } 程序执行结果如下: 从代码中不难看出: 在生产过程中,因每放入一个数据有1秒延时,所以这个生产的过程是很慢的; 在消费过程中,存在着一个while循环,只有在接收到表示结束的数据的时候,才会停止,每次循环内部,都是先加锁,判断队列不空,然后就取出一个数,最后解锁。 所以说,在1s内,做了很多无用功!这样的话,CPU占用率会很高,可能达到100%(单核)。如下图示: 既然是由于消费者在while循环内因等待数据做了过多的无用功导致CPU占有率过高,我们可以考虑在消费者发现队列为空时,让消费者小睡一会儿,即增加一个小延时(比如500ms),相当于增大了轮询间隔周期,应该能降低CPU的占用率。 按该方案修改后的消费者代码如下:
增大轮询周期后,CPU占有率下降很明显: 但前面也说了,困难之处在于如何确定这个延长时间(即轮询间隔周期),如果间隔太短会过多占用CPU资源,如果间隔太长会因无法及时响应造成延误。 这就引入了条件变量来解决该问题: 条件变量使用“通知—唤醒”模型,生产者生产出一个数据后通知消费者使用,消费者在未接到通知前处于休眠状态节约CPU资源; 当消费者收到通知后,赶紧从休眠状态被唤醒来处理数据,使用了事件驱动模型,在保证不误事儿的情况下尽可能减少无用功降低对资源的消耗。 二、如何使用条件变量? C++标准库在< condition_variable >中提供了条件变量,借由它,一个线程可以唤醒一个或多个其他等待中的线程。原则上,条件变量的运作如下:
将上面的cond_var1.cpp程序使用条件变量解决轮询间隔难题的示例代码如下: //cond_var2.cpp用条件变量解决轮询间隔难题
#include <iostream> #include <deque> #include <thread> #include <mutex> #include <condition_variable>
std::deque<int> q; //双端队列标准容器全局变量 std::mutex mu; //互斥锁全局变量 std::condition_variable cond; //全局条件变量 //生产者,往队列放入数据 void function_1() { int count = 10; while (count > 0) { std::unique_lock<std::mutex> locker(mu); q.push_front(count); //数据入队锁保护 locker.unlock(); cond.notify_one(); // 向一个等待线程发出“条件已满足”的通知 std::this_thread::sleep_for(std::chrono::seconds(1)); //延时1秒 count--; } } //消费者,从队列提取数据 void function_2() { int data = 0; while ( data != 1) { std::unique_lock<std::mutex> locker(mu); while(q.empty()) //判断队列是否为空 cond.wait(locker); // 解锁互斥量并陷入休眠以等待通知被唤醒,被唤醒后加锁以保护共享数据
data = q.back(); q.pop_back(); //数据出队锁保护 locker.unlock(); std::cout << 't2 got a value from t1: ' << data << std::endl; } }
int main() { std::thread t1(function_1); std::thread t2(function_2); t1.join(); t2.join();
getchar(); return 0; } 使用条件变量对CPU的占用率也很低,而且免去了轮询间隔该设多长的难题: 上面的代码有三个注意事项:
还可以将cond.wait(locker)换一种写法,wait()的第二个参数可以传入一个函数表示检查条件,这里使用lambda函数最为简单,如果这个函数返回的是true,wait()函数不会阻塞会直接返回,如果这个函数返回的是false,wait()函数就会阻塞着等待唤醒,如果被伪唤醒,会继续判断函数返回值。代码示例如下:
值得注意的是:
线程同步保证了多个线程对共享数据的有序访问,目前我们了解到的多线程间传递数据主要是通过共享数据(全局变量)实现的,全局共享变量的使用容易增加不同任务或线程间的耦合度,也增加了引入bug的风险,所以全局共享变量应尽可能少用。很多时候我们只需要传递某个线程或任务的执行结果,以便参与后续的运算,但我们又不想阻塞等待该线程或任务执行完毕,而是继续执行暂时不需要该线程或任务执行结果参与的运算,当需要该线程执行结果时直接获得,才能更充分发挥多线程并发的效率优势。 |
|